1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub(crate) expr: DebugExpr,
46 pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
49}
50
51impl Clone for ClosureExpr {
52 fn clone(&self) -> Self {
53 Self {
54 expr: self.expr.clone(),
55 singleton_refs: self
56 .singleton_refs
57 .iter()
58 .map(|(node, is_mut)| {
59 let HydroNode::Reference {
60 inner,
61 kind,
62 metadata,
63 } = node
64 else {
65 panic!("singleton_refs should only contain HydroNode::Reference");
66 };
67 (
68 HydroNode::Reference {
69 inner: SharedNode(Rc::clone(&inner.0)),
70 kind: *kind,
71 metadata: metadata.clone(),
72 },
73 *is_mut,
74 )
75 })
76 .collect(),
77 }
78 }
79}
80
81impl Hash for ClosureExpr {
82 fn hash<H: Hasher>(&self, state: &mut H) {
83 self.expr.hash(state);
84 }
88}
89
90impl serde::Serialize for ClosureExpr {
91 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
92 use serde::ser::SerializeStruct;
93 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
94 s.serialize_field("expr", &self.expr)?;
95 s.serialize_field(
96 "singleton_refs",
97 &SerializableSingletonRefs(&self.singleton_refs),
98 )?;
99 s.end()
100 }
101}
102
103struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
104
105impl serde::Serialize for SerializableSingletonRefs<'_> {
106 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
107 use serde::ser::SerializeSeq;
108 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
109 for (node, is_mut) in self.0.iter() {
110 seq.serialize_element(&(node, is_mut))?;
111 }
112 seq.end()
113 }
114}
115
116impl Debug for ClosureExpr {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 Debug::fmt(&self.expr, f)
119 }
120}
121
122impl Display for ClosureExpr {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 Display::fmt(&self.expr, f)
125 }
126}
127
128impl From<syn::Expr> for ClosureExpr {
129 fn from(expr: syn::Expr) -> Self {
130 Self {
131 expr: DebugExpr(Box::new(expr)),
132 singleton_refs: Vec::new(),
133 }
134 }
135}
136
137impl From<DebugExpr> for ClosureExpr {
138 fn from(expr: DebugExpr) -> Self {
139 Self {
140 expr,
141 singleton_refs: Vec::new(),
142 }
143 }
144}
145
146impl ClosureExpr {
147 pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
148 Self {
149 expr,
150 singleton_refs,
151 }
152 }
153
154 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
155 Self {
156 expr: self.expr.clone(),
157 singleton_refs: self
158 .singleton_refs
159 .iter()
160 .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
161 .collect(),
162 }
163 }
164
165 pub fn transform_children(
166 &mut self,
167 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
168 seen_tees: &mut SeenSharedNodes,
169 ) {
170 for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
171 transform(ref_node, seen_tees);
172 }
173 }
174
175 #[cfg(feature = "build")]
178 pub fn emit_tokens(
179 &self,
180 ident_stack: &mut Vec<syn::Ident>,
181 access_counters: &mut HashMap<*const RefCell<HydroNode>, u32>,
182 ) -> TokenStream {
183 if self.singleton_refs.is_empty() {
184 self.expr.0.to_token_stream()
185 } else {
186 assert!(
187 ident_stack.len() >= self.singleton_refs.len(),
188 "ident_stack has {} entries but expected at least {} for singleton_refs",
189 ident_stack.len(),
190 self.singleton_refs.len()
191 );
192 let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
193
194 let mut let_bindings = Vec::new();
195 for ((i, (node, is_mut)), ref_ident) in
196 self.singleton_refs.iter().enumerate().zip(ref_idents)
197 {
198 let HydroNode::Reference { inner, .. } = node else {
199 panic!("singleton_refs should only contain HydroNode::Reference");
200 };
201 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
202 let counter = access_counters.entry(ptr).or_insert(0);
203 let group = if *is_mut {
204 *counter += 1;
205 let g = *counter;
206 *counter += 1;
207 g
208 } else {
209 *counter
210 };
211
212 let local_ident = handoff_ref_ident(i);
214 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
215 let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
216 let mut_token = is_mut.then(|| quote!(mut));
217 let binding = quote! {
218 let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
219 };
220 let_bindings.push(binding);
221 }
222
223 let expr = &self.expr.0;
224 quote! {
225 {
226 #( #let_bindings )*
227 #expr
228 }
229 }
230 }
231 }
232}
233
234#[derive(Clone, Hash)]
238pub struct DebugExpr(pub Box<syn::Expr>);
239
240impl serde::Serialize for DebugExpr {
241 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
242 serializer.serialize_str(&self.to_string())
243 }
244}
245
246impl From<syn::Expr> for DebugExpr {
247 fn from(expr: syn::Expr) -> Self {
248 Self(Box::new(expr))
249 }
250}
251
252impl Deref for DebugExpr {
253 type Target = syn::Expr;
254
255 fn deref(&self) -> &Self::Target {
256 &self.0
257 }
258}
259
260impl ToTokens for DebugExpr {
261 fn to_tokens(&self, tokens: &mut TokenStream) {
262 self.0.to_tokens(tokens);
263 }
264}
265
266impl Debug for DebugExpr {
267 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
268 write!(f, "{}", self.0.to_token_stream())
269 }
270}
271
272impl Display for DebugExpr {
273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274 let original = self.0.as_ref().clone();
275 let simplified = simplify_q_macro(original);
276
277 write!(f, "q!({})", quote::quote!(#simplified))
280 }
281}
282
283fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
285 if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
286 && is_stageleft_runtime_support_call(&path_expr.path)
288 && let syn::Expr::Block(b) = &call.args[0]
289 && b.block.stmts.len() == 3
290 && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
291 {
293 let mut e = e.clone();
294 while let syn::Expr::Block(ref mut block) = e
295 && block.block.stmts.len() == 1
296 && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
297 {
298 e = inner_e;
299 }
300
301 e
302 } else {
303 expr
304 }
305}
306
307fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
308 if let Some(last_segment) = path.segments.last() {
310 let fn_name = last_segment.ident.to_string();
311 path.segments.len() > 2
312 && path.segments[0].ident == "stageleft"
313 && path.segments[1].ident == "runtime_support"
314 && fn_name.contains("_type_hint")
315 } else {
316 false
317 }
318}
319
320#[derive(Clone, PartialEq, Eq, Hash)]
324pub struct DebugType(pub Box<syn::Type>);
325
326impl From<syn::Type> for DebugType {
327 fn from(t: syn::Type) -> Self {
328 Self(Box::new(t))
329 }
330}
331
332impl Deref for DebugType {
333 type Target = syn::Type;
334
335 fn deref(&self) -> &Self::Target {
336 &self.0
337 }
338}
339
340impl ToTokens for DebugType {
341 fn to_tokens(&self, tokens: &mut TokenStream) {
342 self.0.to_tokens(tokens);
343 }
344}
345
346impl Debug for DebugType {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 write!(f, "{}", self.0.to_token_stream())
349 }
350}
351
352impl serde::Serialize for DebugType {
353 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
354 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
355 }
356}
357
358fn serialize_backtrace_as_span<S: serde::Serializer>(
359 backtrace: &Backtrace,
360 serializer: S,
361) -> Result<S::Ok, S::Error> {
362 match backtrace.format_span() {
363 Some(span) => serializer.serialize_some(&span),
364 None => serializer.serialize_none(),
365 }
366}
367
368fn serialize_ident<S: serde::Serializer>(
369 ident: &syn::Ident,
370 serializer: S,
371) -> Result<S::Ok, S::Error> {
372 serializer.serialize_str(&ident.to_string())
373}
374
375pub enum DebugInstantiate {
376 Building,
377 Finalized(Box<DebugInstantiateFinalized>),
378}
379
380impl serde::Serialize for DebugInstantiate {
381 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
382 match self {
383 DebugInstantiate::Building => {
384 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
385 }
386 DebugInstantiate::Finalized(_) => {
387 panic!(
388 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
389 )
390 }
391 }
392 }
393}
394
395#[cfg_attr(
396 not(feature = "build"),
397 expect(
398 dead_code,
399 reason = "sink, source unused without `feature = \"build\"`."
400 )
401)]
402pub struct DebugInstantiateFinalized {
403 sink: syn::Expr,
404 source: syn::Expr,
405 connect_fn: Option<Box<dyn FnOnce()>>,
406}
407
408impl From<DebugInstantiateFinalized> for DebugInstantiate {
409 fn from(f: DebugInstantiateFinalized) -> Self {
410 Self::Finalized(Box::new(f))
411 }
412}
413
414impl Debug for DebugInstantiate {
415 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416 write!(f, "<network instantiate>")
417 }
418}
419
420impl Hash for DebugInstantiate {
421 fn hash<H: Hasher>(&self, _state: &mut H) {
422 }
424}
425
426impl Clone for DebugInstantiate {
427 fn clone(&self) -> Self {
428 match self {
429 DebugInstantiate::Building => DebugInstantiate::Building,
430 DebugInstantiate::Finalized(_) => {
431 panic!("DebugInstantiate::Finalized should not be cloned")
432 }
433 }
434 }
435}
436
437#[derive(Debug, Hash, Clone, serde::Serialize)]
446pub enum ClusterMembersState {
447 Uninit,
449 Stream(DebugExpr),
452 Tee(LocationId, LocationId),
456}
457
458#[derive(Debug, Hash, Clone, serde::Serialize)]
460pub enum HydroSource {
461 Stream(DebugExpr),
462 ExternalNetwork(),
463 Iter(DebugExpr),
464 Spin(),
465 ClusterMembers(LocationId, ClusterMembersState),
466 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
467 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
468}
469
470#[cfg(feature = "build")]
471pub trait DfirBuilder {
477 fn singleton_intermediates(&self) -> bool;
479
480 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
482
483 #[expect(clippy::too_many_arguments, reason = "TODO")]
484 fn batch(
485 &mut self,
486 in_ident: syn::Ident,
487 in_location: &LocationId,
488 in_kind: &CollectionKind,
489 out_ident: &syn::Ident,
490 out_location: &LocationId,
491 op_meta: &HydroIrOpMetadata,
492 fold_hooked_idents: &HashSet<String>,
493 );
494 fn yield_from_tick(
495 &mut self,
496 in_ident: syn::Ident,
497 in_location: &LocationId,
498 in_kind: &CollectionKind,
499 out_ident: &syn::Ident,
500 out_location: &LocationId,
501 );
502
503 fn begin_atomic(
504 &mut self,
505 in_ident: syn::Ident,
506 in_location: &LocationId,
507 in_kind: &CollectionKind,
508 out_ident: &syn::Ident,
509 out_location: &LocationId,
510 op_meta: &HydroIrOpMetadata,
511 );
512 fn end_atomic(
513 &mut self,
514 in_ident: syn::Ident,
515 in_location: &LocationId,
516 in_kind: &CollectionKind,
517 out_ident: &syn::Ident,
518 );
519
520 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
521 fn observe_nondet(
522 &mut self,
523 trusted: bool,
524 location: &LocationId,
525 in_ident: syn::Ident,
526 in_kind: &CollectionKind,
527 out_ident: &syn::Ident,
528 out_kind: &CollectionKind,
529 op_meta: &HydroIrOpMetadata,
530 );
531
532 #[expect(clippy::too_many_arguments, reason = "TODO")]
533 fn merge_ordered(
534 &mut self,
535 location: &LocationId,
536 first_ident: syn::Ident,
537 second_ident: syn::Ident,
538 out_ident: &syn::Ident,
539 in_kind: &CollectionKind,
540 op_meta: &HydroIrOpMetadata,
541 operator_tag: Option<&str>,
542 );
543
544 #[expect(clippy::too_many_arguments, reason = "TODO")]
545 fn create_network(
546 &mut self,
547 from: &LocationId,
548 to: &LocationId,
549 input_ident: syn::Ident,
550 out_ident: &syn::Ident,
551 serialize: Option<&DebugExpr>,
552 sink: syn::Expr,
553 source: syn::Expr,
554 deserialize: Option<&DebugExpr>,
555 tag_id: StmtId,
556 networking_info: &crate::networking::NetworkingInfo,
557 );
558
559 fn create_external_source(
560 &mut self,
561 on: &LocationId,
562 source_expr: syn::Expr,
563 out_ident: &syn::Ident,
564 deserialize: Option<&DebugExpr>,
565 tag_id: StmtId,
566 );
567
568 fn create_external_output(
569 &mut self,
570 on: &LocationId,
571 sink_expr: syn::Expr,
572 input_ident: &syn::Ident,
573 serialize: Option<&DebugExpr>,
574 tag_id: StmtId,
575 );
576
577 fn emit_fold_hook(
580 &mut self,
581 location: &LocationId,
582 in_ident: &syn::Ident,
583 in_kind: &CollectionKind,
584 op_meta: &HydroIrOpMetadata,
585 ) -> Option<syn::Ident>;
586
587 fn assert_is_consistent(
591 &mut self,
592 trusted: bool,
593 location: &LocationId,
594 in_ident: syn::Ident,
595 out_ident: &syn::Ident,
596 );
597}
598
599#[cfg(feature = "build")]
600impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
601 fn singleton_intermediates(&self) -> bool {
602 false
603 }
604
605 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
606 self.entry(location.root().key())
607 .expect("location was removed")
608 .or_default()
609 }
610
611 fn batch(
612 &mut self,
613 in_ident: syn::Ident,
614 in_location: &LocationId,
615 in_kind: &CollectionKind,
616 out_ident: &syn::Ident,
617 _out_location: &LocationId,
618 _op_meta: &HydroIrOpMetadata,
619 _fold_hooked_idents: &HashSet<String>,
620 ) {
621 let builder = self.get_dfir_mut(in_location.root());
622 if in_kind.is_bounded()
623 && matches!(
624 in_kind,
625 CollectionKind::Singleton { .. }
626 | CollectionKind::Optional { .. }
627 | CollectionKind::KeyedSingleton { .. }
628 )
629 {
630 assert!(in_location.is_top_level());
631 builder.add_dfir(
632 parse_quote! {
633 #out_ident = #in_ident -> persist::<'static>();
634 },
635 None,
636 None,
637 );
638 } else {
639 builder.add_dfir(
640 parse_quote! {
641 #out_ident = #in_ident;
642 },
643 None,
644 None,
645 );
646 }
647 }
648
649 fn yield_from_tick(
650 &mut self,
651 in_ident: syn::Ident,
652 in_location: &LocationId,
653 _in_kind: &CollectionKind,
654 out_ident: &syn::Ident,
655 _out_location: &LocationId,
656 ) {
657 let builder = self.get_dfir_mut(in_location.root());
658 builder.add_dfir(
659 parse_quote! {
660 #out_ident = #in_ident;
661 },
662 None,
663 None,
664 );
665 }
666
667 fn begin_atomic(
668 &mut self,
669 in_ident: syn::Ident,
670 in_location: &LocationId,
671 _in_kind: &CollectionKind,
672 out_ident: &syn::Ident,
673 _out_location: &LocationId,
674 _op_meta: &HydroIrOpMetadata,
675 ) {
676 let builder = self.get_dfir_mut(in_location.root());
677 builder.add_dfir(
678 parse_quote! {
679 #out_ident = #in_ident;
680 },
681 None,
682 None,
683 );
684 }
685
686 fn end_atomic(
687 &mut self,
688 in_ident: syn::Ident,
689 in_location: &LocationId,
690 _in_kind: &CollectionKind,
691 out_ident: &syn::Ident,
692 ) {
693 let builder = self.get_dfir_mut(in_location.root());
694 builder.add_dfir(
695 parse_quote! {
696 #out_ident = #in_ident;
697 },
698 None,
699 None,
700 );
701 }
702
703 fn observe_nondet(
704 &mut self,
705 _trusted: bool,
706 location: &LocationId,
707 in_ident: syn::Ident,
708 _in_kind: &CollectionKind,
709 out_ident: &syn::Ident,
710 _out_kind: &CollectionKind,
711 _op_meta: &HydroIrOpMetadata,
712 ) {
713 let builder = self.get_dfir_mut(location);
714 builder.add_dfir(
715 parse_quote! {
716 #out_ident = #in_ident;
717 },
718 None,
719 None,
720 );
721 }
722
723 fn merge_ordered(
724 &mut self,
725 location: &LocationId,
726 first_ident: syn::Ident,
727 second_ident: syn::Ident,
728 out_ident: &syn::Ident,
729 _in_kind: &CollectionKind,
730 _op_meta: &HydroIrOpMetadata,
731 operator_tag: Option<&str>,
732 ) {
733 let builder = self.get_dfir_mut(location);
734 builder.add_dfir(
735 parse_quote! {
736 #out_ident = union();
737 #first_ident -> [0]#out_ident;
738 #second_ident -> [1]#out_ident;
739 },
740 None,
741 operator_tag,
742 );
743 }
744
745 fn create_network(
746 &mut self,
747 from: &LocationId,
748 to: &LocationId,
749 input_ident: syn::Ident,
750 out_ident: &syn::Ident,
751 serialize: Option<&DebugExpr>,
752 sink: syn::Expr,
753 source: syn::Expr,
754 deserialize: Option<&DebugExpr>,
755 tag_id: StmtId,
756 _networking_info: &crate::networking::NetworkingInfo,
757 ) {
758 let sender_builder = self.get_dfir_mut(from);
759 if let Some(serialize_pipeline) = serialize {
760 sender_builder.add_dfir(
761 parse_quote! {
762 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
763 },
764 None,
765 Some(&format!("send{}", tag_id)),
767 );
768 } else {
769 sender_builder.add_dfir(
770 parse_quote! {
771 #input_ident -> dest_sink(#sink);
772 },
773 None,
774 Some(&format!("send{}", tag_id)),
775 );
776 }
777
778 let receiver_builder = self.get_dfir_mut(to);
779 if let Some(deserialize_pipeline) = deserialize {
780 receiver_builder.add_dfir(
781 parse_quote! {
782 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
783 },
784 None,
785 Some(&format!("recv{}", tag_id)),
786 );
787 } else {
788 receiver_builder.add_dfir(
789 parse_quote! {
790 #out_ident = source_stream(#source);
791 },
792 None,
793 Some(&format!("recv{}", tag_id)),
794 );
795 }
796 }
797
798 fn create_external_source(
799 &mut self,
800 on: &LocationId,
801 source_expr: syn::Expr,
802 out_ident: &syn::Ident,
803 deserialize: Option<&DebugExpr>,
804 tag_id: StmtId,
805 ) {
806 let receiver_builder = self.get_dfir_mut(on);
807 if let Some(deserialize_pipeline) = deserialize {
808 receiver_builder.add_dfir(
809 parse_quote! {
810 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
811 },
812 None,
813 Some(&format!("recv{}", tag_id)),
814 );
815 } else {
816 receiver_builder.add_dfir(
817 parse_quote! {
818 #out_ident = source_stream(#source_expr);
819 },
820 None,
821 Some(&format!("recv{}", tag_id)),
822 );
823 }
824 }
825
826 fn create_external_output(
827 &mut self,
828 on: &LocationId,
829 sink_expr: syn::Expr,
830 input_ident: &syn::Ident,
831 serialize: Option<&DebugExpr>,
832 tag_id: StmtId,
833 ) {
834 let sender_builder = self.get_dfir_mut(on);
835 if let Some(serialize_fn) = serialize {
836 sender_builder.add_dfir(
837 parse_quote! {
838 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
839 },
840 None,
841 Some(&format!("send{}", tag_id)),
843 );
844 } else {
845 sender_builder.add_dfir(
846 parse_quote! {
847 #input_ident -> dest_sink(#sink_expr);
848 },
849 None,
850 Some(&format!("send{}", tag_id)),
851 );
852 }
853 }
854
855 fn emit_fold_hook(
856 &mut self,
857 _location: &LocationId,
858 _in_ident: &syn::Ident,
859 _in_kind: &CollectionKind,
860 _op_meta: &HydroIrOpMetadata,
861 ) -> Option<syn::Ident> {
862 None
863 }
864
865 fn assert_is_consistent(
866 &mut self,
867 _trusted: bool,
868 location: &LocationId,
869 in_ident: syn::Ident,
870 out_ident: &syn::Ident,
871 ) {
872 let builder = self.get_dfir_mut(location);
873 builder.add_dfir(
874 parse_quote! {
875 #out_ident = #in_ident;
876 },
877 None,
878 None,
879 );
880 }
881}
882
883#[cfg(feature = "build")]
884pub enum BuildersOrCallback<'a, L, N>
885where
886 L: FnMut(&mut HydroRoot, &mut StmtId),
887 N: FnMut(&mut HydroNode, &mut StmtId),
888{
889 Builders(&'a mut dyn DfirBuilder),
890 Callback(L, N),
891}
892
893#[derive(Debug, Hash, serde::Serialize)]
897pub enum HydroRoot {
898 ForEach {
899 f: ClosureExpr,
900 input: Box<HydroNode>,
901 op_metadata: HydroIrOpMetadata,
902 },
903 SendExternal {
904 to_external_key: LocationKey,
905 to_port_id: ExternalPortId,
906 to_many: bool,
907 unpaired: bool,
908 serialize_fn: Option<DebugExpr>,
909 instantiate_fn: DebugInstantiate,
910 input: Box<HydroNode>,
911 op_metadata: HydroIrOpMetadata,
912 },
913 DestSink {
914 sink: DebugExpr,
915 input: Box<HydroNode>,
916 op_metadata: HydroIrOpMetadata,
917 },
918 CycleSink {
919 cycle_id: CycleId,
920 input: Box<HydroNode>,
921 op_metadata: HydroIrOpMetadata,
922 },
923 EmbeddedOutput {
924 #[serde(serialize_with = "serialize_ident")]
925 ident: syn::Ident,
926 input: Box<HydroNode>,
927 op_metadata: HydroIrOpMetadata,
928 },
929 Null {
930 input: Box<HydroNode>,
931 op_metadata: HydroIrOpMetadata,
932 },
933}
934
935impl HydroRoot {
936 #[cfg(feature = "build")]
937 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
938 pub fn compile_network<'a, D>(
939 &mut self,
940 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
941 seen_tees: &mut SeenSharedNodes,
942 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
943 processes: &SparseSecondaryMap<LocationKey, D::Process>,
944 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
945 externals: &SparseSecondaryMap<LocationKey, D::External>,
946 env: &mut D::InstantiateEnv,
947 ) where
948 D: Deploy<'a>,
949 {
950 let refcell_extra_stmts = RefCell::new(extra_stmts);
951 let refcell_env = RefCell::new(env);
952 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
953 self.transform_bottom_up(
954 &mut |l| {
955 if let HydroRoot::SendExternal {
956 input,
957 to_external_key,
958 to_port_id,
959 to_many,
960 unpaired,
961 instantiate_fn,
962 ..
963 } = l
964 {
965 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
966 DebugInstantiate::Building => {
967 let to_node = externals
968 .get(*to_external_key)
969 .unwrap_or_else(|| {
970 panic!("A external used in the graph was not instantiated: {}", to_external_key)
971 })
972 .clone();
973
974 match input.metadata().location_id.root() {
975 &LocationId::Process(process_key) => {
976 if *to_many {
977 (
978 (
979 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
980 parse_quote!(DUMMY),
981 ),
982 Box::new(|| {}) as Box<dyn FnOnce()>,
983 )
984 } else {
985 let from_node = processes
986 .get(process_key)
987 .unwrap_or_else(|| {
988 panic!("A process used in the graph was not instantiated: {}", process_key)
989 })
990 .clone();
991
992 let sink_port = from_node.next_port();
993 let source_port = to_node.next_port();
994
995 if *unpaired {
996 use stageleft::quote_type;
997 use tokio_util::codec::LengthDelimitedCodec;
998
999 to_node.register(*to_port_id, source_port.clone());
1000
1001 let _ = D::e2o_source(
1002 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1003 &to_node, &source_port,
1004 &from_node, &sink_port,
1005 "e_type::<LengthDelimitedCodec>(),
1006 format!("{}_{}", *to_external_key, *to_port_id)
1007 );
1008 }
1009
1010 (
1011 (
1012 D::o2e_sink(
1013 &from_node,
1014 &sink_port,
1015 &to_node,
1016 &source_port,
1017 format!("{}_{}", *to_external_key, *to_port_id)
1018 ),
1019 parse_quote!(DUMMY),
1020 ),
1021 if *unpaired {
1022 D::e2o_connect(
1023 &to_node,
1024 &source_port,
1025 &from_node,
1026 &sink_port,
1027 *to_many,
1028 NetworkHint::Auto,
1029 )
1030 } else {
1031 Box::new(|| {}) as Box<dyn FnOnce()>
1032 },
1033 )
1034 }
1035 }
1036 LocationId::Cluster(cluster_key) => {
1037 let from_node = clusters
1038 .get(*cluster_key)
1039 .unwrap_or_else(|| {
1040 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1041 })
1042 .clone();
1043
1044 let sink_port = from_node.next_port();
1045 let source_port = to_node.next_port();
1046
1047 if *unpaired {
1048 to_node.register(*to_port_id, source_port.clone());
1049 }
1050
1051 (
1052 (
1053 D::m2e_sink(
1054 &from_node,
1055 &sink_port,
1056 &to_node,
1057 &source_port,
1058 format!("{}_{}", *to_external_key, *to_port_id)
1059 ),
1060 parse_quote!(DUMMY),
1061 ),
1062 Box::new(|| {}) as Box<dyn FnOnce()>,
1063 )
1064 }
1065 _ => panic!()
1066 }
1067 },
1068
1069 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1070 };
1071
1072 *instantiate_fn = DebugInstantiateFinalized {
1073 sink: sink_expr,
1074 source: source_expr,
1075 connect_fn: Some(connect_fn),
1076 }
1077 .into();
1078 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1079 let element_type = match &input.metadata().collection_kind {
1080 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1081 _ => panic!("Embedded output must have Stream collection kind"),
1082 };
1083 let location_key = match input.metadata().location_id.root() {
1084 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1085 _ => panic!("Embedded output must be on a process or cluster"),
1086 };
1087 D::register_embedded_output(
1088 &mut refcell_env.borrow_mut(),
1089 location_key,
1090 ident,
1091 &element_type,
1092 );
1093 }
1094 },
1095 &mut |n| {
1096 if let HydroNode::Network {
1097 name,
1098 networking_info,
1099 input,
1100 instantiate_fn,
1101 metadata,
1102 ..
1103 } = n
1104 {
1105 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1106 DebugInstantiate::Building => instantiate_network::<D>(
1107 &mut refcell_env.borrow_mut(),
1108 input.metadata().location_id.root(),
1109 metadata.location_id.root(),
1110 processes,
1111 clusters,
1112 name.as_deref(),
1113 networking_info,
1114 ),
1115
1116 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1117 };
1118
1119 *instantiate_fn = DebugInstantiateFinalized {
1120 sink: sink_expr,
1121 source: source_expr,
1122 connect_fn: Some(connect_fn),
1123 }
1124 .into();
1125 } else if let HydroNode::ExternalInput {
1126 from_external_key,
1127 from_port_id,
1128 from_many,
1129 codec_type,
1130 port_hint,
1131 instantiate_fn,
1132 metadata,
1133 ..
1134 } = n
1135 {
1136 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1137 DebugInstantiate::Building => {
1138 let from_node = externals
1139 .get(*from_external_key)
1140 .unwrap_or_else(|| {
1141 panic!(
1142 "A external used in the graph was not instantiated: {}",
1143 from_external_key,
1144 )
1145 })
1146 .clone();
1147
1148 match metadata.location_id.root() {
1149 &LocationId::Process(process_key) => {
1150 let to_node = processes
1151 .get(process_key)
1152 .unwrap_or_else(|| {
1153 panic!("A process used in the graph was not instantiated: {}", process_key)
1154 })
1155 .clone();
1156
1157 let sink_port = from_node.next_port();
1158 let source_port = to_node.next_port();
1159
1160 from_node.register(*from_port_id, sink_port.clone());
1161
1162 (
1163 (
1164 parse_quote!(DUMMY),
1165 if *from_many {
1166 D::e2o_many_source(
1167 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1168 &to_node, &source_port,
1169 codec_type.0.as_ref(),
1170 format!("{}_{}", *from_external_key, *from_port_id)
1171 )
1172 } else {
1173 D::e2o_source(
1174 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1175 &from_node, &sink_port,
1176 &to_node, &source_port,
1177 codec_type.0.as_ref(),
1178 format!("{}_{}", *from_external_key, *from_port_id)
1179 )
1180 },
1181 ),
1182 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1183 )
1184 }
1185 LocationId::Cluster(cluster_key) => {
1186 let to_node = clusters
1187 .get(*cluster_key)
1188 .unwrap_or_else(|| {
1189 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1190 })
1191 .clone();
1192
1193 let sink_port = from_node.next_port();
1194 let source_port = to_node.next_port();
1195
1196 from_node.register(*from_port_id, sink_port.clone());
1197
1198 (
1199 (
1200 parse_quote!(DUMMY),
1201 D::e2m_source(
1202 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1203 &from_node, &sink_port,
1204 &to_node, &source_port,
1205 codec_type.0.as_ref(),
1206 format!("{}_{}", *from_external_key, *from_port_id)
1207 ),
1208 ),
1209 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1210 )
1211 }
1212 _ => panic!()
1213 }
1214 },
1215
1216 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1217 };
1218
1219 *instantiate_fn = DebugInstantiateFinalized {
1220 sink: sink_expr,
1221 source: source_expr,
1222 connect_fn: Some(connect_fn),
1223 }
1224 .into();
1225 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1226 let element_type = match &metadata.collection_kind {
1227 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1228 _ => panic!("Embedded source must have Stream collection kind"),
1229 };
1230 let location_key = match metadata.location_id.root() {
1231 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1232 _ => panic!("Embedded source must be on a process or cluster"),
1233 };
1234 D::register_embedded_stream_input(
1235 &mut refcell_env.borrow_mut(),
1236 location_key,
1237 ident,
1238 &element_type,
1239 );
1240 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1241 let element_type = match &metadata.collection_kind {
1242 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1243 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1244 };
1245 let location_key = match metadata.location_id.root() {
1246 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1247 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1248 };
1249 D::register_embedded_singleton_input(
1250 &mut refcell_env.borrow_mut(),
1251 location_key,
1252 ident,
1253 &element_type,
1254 );
1255 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1256 match state {
1257 ClusterMembersState::Uninit => {
1258 let at_location = metadata.location_id.root().clone();
1259 let key = (at_location.clone(), location_id.key());
1260 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1261 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1263 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1264 &(),
1265 );
1266 *state = ClusterMembersState::Stream(expr.into());
1267 } else {
1268 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1270 }
1271 }
1272 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1273 panic!("cluster members already finalized");
1274 }
1275 }
1276 }
1277 },
1278 seen_tees,
1279 false,
1280 );
1281 }
1282
1283 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1284 self.transform_bottom_up(
1285 &mut |l| {
1286 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1287 match instantiate_fn {
1288 DebugInstantiate::Building => panic!("network not built"),
1289
1290 DebugInstantiate::Finalized(finalized) => {
1291 (finalized.connect_fn.take().unwrap())();
1292 }
1293 }
1294 }
1295 },
1296 &mut |n| {
1297 if let HydroNode::Network { instantiate_fn, .. }
1298 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1299 {
1300 match instantiate_fn {
1301 DebugInstantiate::Building => panic!("network not built"),
1302
1303 DebugInstantiate::Finalized(finalized) => {
1304 (finalized.connect_fn.take().unwrap())();
1305 }
1306 }
1307 }
1308 },
1309 seen_tees,
1310 false,
1311 );
1312 }
1313
1314 pub fn transform_bottom_up(
1315 &mut self,
1316 transform_root: &mut impl FnMut(&mut HydroRoot),
1317 transform_node: &mut impl FnMut(&mut HydroNode),
1318 seen_tees: &mut SeenSharedNodes,
1319 check_well_formed: bool,
1320 ) {
1321 self.transform_children(
1322 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1323 seen_tees,
1324 );
1325
1326 transform_root(self);
1327 }
1328
1329 pub fn transform_children(
1330 &mut self,
1331 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1332 seen_tees: &mut SeenSharedNodes,
1333 ) {
1334 match self {
1335 HydroRoot::ForEach { f, input, .. } => {
1336 f.transform_children(&mut transform, seen_tees);
1337 transform(input, seen_tees);
1338 }
1339 HydroRoot::SendExternal { input, .. }
1340 | HydroRoot::DestSink { input, .. }
1341 | HydroRoot::CycleSink { input, .. }
1342 | HydroRoot::EmbeddedOutput { input, .. }
1343 | HydroRoot::Null { input, .. } => {
1344 transform(input, seen_tees);
1345 }
1346 }
1347 }
1348
1349 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1350 match self {
1351 HydroRoot::ForEach {
1352 f,
1353 input,
1354 op_metadata,
1355 } => HydroRoot::ForEach {
1356 f: f.deep_clone(seen_tees),
1357 input: Box::new(input.deep_clone(seen_tees)),
1358 op_metadata: op_metadata.clone(),
1359 },
1360 HydroRoot::SendExternal {
1361 to_external_key,
1362 to_port_id,
1363 to_many,
1364 unpaired,
1365 serialize_fn,
1366 instantiate_fn,
1367 input,
1368 op_metadata,
1369 } => HydroRoot::SendExternal {
1370 to_external_key: *to_external_key,
1371 to_port_id: *to_port_id,
1372 to_many: *to_many,
1373 unpaired: *unpaired,
1374 serialize_fn: serialize_fn.clone(),
1375 instantiate_fn: instantiate_fn.clone(),
1376 input: Box::new(input.deep_clone(seen_tees)),
1377 op_metadata: op_metadata.clone(),
1378 },
1379 HydroRoot::DestSink {
1380 sink,
1381 input,
1382 op_metadata,
1383 } => HydroRoot::DestSink {
1384 sink: sink.clone(),
1385 input: Box::new(input.deep_clone(seen_tees)),
1386 op_metadata: op_metadata.clone(),
1387 },
1388 HydroRoot::CycleSink {
1389 cycle_id,
1390 input,
1391 op_metadata,
1392 } => HydroRoot::CycleSink {
1393 cycle_id: *cycle_id,
1394 input: Box::new(input.deep_clone(seen_tees)),
1395 op_metadata: op_metadata.clone(),
1396 },
1397 HydroRoot::EmbeddedOutput {
1398 ident,
1399 input,
1400 op_metadata,
1401 } => HydroRoot::EmbeddedOutput {
1402 ident: ident.clone(),
1403 input: Box::new(input.deep_clone(seen_tees)),
1404 op_metadata: op_metadata.clone(),
1405 },
1406 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1407 input: Box::new(input.deep_clone(seen_tees)),
1408 op_metadata: op_metadata.clone(),
1409 },
1410 }
1411 }
1412
1413 #[cfg(feature = "build")]
1414 pub fn emit(
1415 &mut self,
1416 graph_builders: &mut dyn DfirBuilder,
1417 seen_tees: &mut SeenSharedNodes,
1418 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1419 next_stmt_id: &mut StmtId,
1420 fold_hooked_idents: &mut HashSet<String>,
1421 ) {
1422 self.emit_core(
1423 &mut BuildersOrCallback::<
1424 fn(&mut HydroRoot, &mut StmtId),
1425 fn(&mut HydroNode, &mut StmtId),
1426 >::Builders(graph_builders),
1427 seen_tees,
1428 built_tees,
1429 next_stmt_id,
1430 fold_hooked_idents,
1431 );
1432 }
1433
1434 #[cfg(feature = "build")]
1435 pub fn emit_core(
1436 &mut self,
1437 builders_or_callback: &mut BuildersOrCallback<
1438 impl FnMut(&mut HydroRoot, &mut StmtId),
1439 impl FnMut(&mut HydroNode, &mut StmtId),
1440 >,
1441 seen_tees: &mut SeenSharedNodes,
1442 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1443 next_stmt_id: &mut StmtId,
1444 fold_hooked_idents: &mut HashSet<String>,
1445 ) {
1446 match self {
1447 HydroRoot::ForEach { f, input, .. } => {
1448 let input_ident = input.emit_core(
1449 builders_or_callback,
1450 seen_tees,
1451 built_tees,
1452 next_stmt_id,
1453 fold_hooked_idents,
1454 );
1455
1456 match builders_or_callback {
1457 BuildersOrCallback::Builders(graph_builders) => {
1458 let mut ident_stack: Vec<syn::Ident> = Vec::new();
1459 let mut singleton_access_counters: HashMap<*const RefCell<HydroNode>, u32> =
1460 HashMap::new();
1461
1462 for (ref_node, _is_mut) in f.singleton_refs.iter() {
1464 let HydroNode::Reference { inner, .. } = ref_node else {
1465 panic!("singleton_refs should only contain HydroNode::Reference");
1466 };
1467 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1468 let idents = built_tees.get(&ptr).expect(
1469 "ForEach singleton ref not found in built_tees — ref node was not emitted",
1470 );
1471 ident_stack.push(idents[0].clone());
1472 }
1473
1474 let f_tokens =
1475 f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
1476
1477 graph_builders
1478 .get_dfir_mut(&input.metadata().location_id)
1479 .add_dfir(
1480 parse_quote! {
1481 #input_ident -> for_each(#f_tokens);
1482 },
1483 None,
1484 Some(&next_stmt_id.to_string()),
1485 );
1486 }
1487 BuildersOrCallback::Callback(leaf_callback, _) => {
1488 leaf_callback(self, next_stmt_id);
1489 }
1490 }
1491
1492 let _ = next_stmt_id.get_and_increment();
1493 }
1494
1495 HydroRoot::SendExternal {
1496 serialize_fn,
1497 instantiate_fn,
1498 input,
1499 ..
1500 } => {
1501 let input_ident = input.emit_core(
1502 builders_or_callback,
1503 seen_tees,
1504 built_tees,
1505 next_stmt_id,
1506 fold_hooked_idents,
1507 );
1508
1509 match builders_or_callback {
1510 BuildersOrCallback::Builders(graph_builders) => {
1511 let (sink_expr, _) = match instantiate_fn {
1512 DebugInstantiate::Building => (
1513 syn::parse_quote!(DUMMY_SINK),
1514 syn::parse_quote!(DUMMY_SOURCE),
1515 ),
1516
1517 DebugInstantiate::Finalized(finalized) => {
1518 (finalized.sink.clone(), finalized.source.clone())
1519 }
1520 };
1521
1522 graph_builders.create_external_output(
1523 &input.metadata().location_id,
1524 sink_expr,
1525 &input_ident,
1526 serialize_fn.as_ref(),
1527 *next_stmt_id,
1528 );
1529 }
1530 BuildersOrCallback::Callback(leaf_callback, _) => {
1531 leaf_callback(self, next_stmt_id);
1532 }
1533 }
1534
1535 let _ = next_stmt_id.get_and_increment();
1536 }
1537
1538 HydroRoot::DestSink { sink, input, .. } => {
1539 let input_ident = input.emit_core(
1540 builders_or_callback,
1541 seen_tees,
1542 built_tees,
1543 next_stmt_id,
1544 fold_hooked_idents,
1545 );
1546
1547 match builders_or_callback {
1548 BuildersOrCallback::Builders(graph_builders) => {
1549 graph_builders
1550 .get_dfir_mut(&input.metadata().location_id)
1551 .add_dfir(
1552 parse_quote! {
1553 #input_ident -> dest_sink(#sink);
1554 },
1555 None,
1556 Some(&next_stmt_id.to_string()),
1557 );
1558 }
1559 BuildersOrCallback::Callback(leaf_callback, _) => {
1560 leaf_callback(self, next_stmt_id);
1561 }
1562 }
1563
1564 let _ = next_stmt_id.get_and_increment();
1565 }
1566
1567 HydroRoot::CycleSink {
1568 cycle_id, input, ..
1569 } => {
1570 let input_ident = input.emit_core(
1571 builders_or_callback,
1572 seen_tees,
1573 built_tees,
1574 next_stmt_id,
1575 fold_hooked_idents,
1576 );
1577
1578 match builders_or_callback {
1579 BuildersOrCallback::Builders(graph_builders) => {
1580 let elem_type: syn::Type = match &input.metadata().collection_kind {
1581 CollectionKind::KeyedSingleton {
1582 key_type,
1583 value_type,
1584 ..
1585 }
1586 | CollectionKind::KeyedStream {
1587 key_type,
1588 value_type,
1589 ..
1590 } => {
1591 parse_quote!((#key_type, #value_type))
1592 }
1593 CollectionKind::Stream { element_type, .. }
1594 | CollectionKind::Singleton { element_type, .. }
1595 | CollectionKind::Optional { element_type, .. } => {
1596 parse_quote!(#element_type)
1597 }
1598 };
1599
1600 let cycle_id_ident = cycle_id.as_ident();
1601 graph_builders
1602 .get_dfir_mut(&input.metadata().location_id)
1603 .add_dfir(
1604 parse_quote! {
1605 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1606 },
1607 None,
1608 None,
1609 );
1610 }
1611 BuildersOrCallback::Callback(_, _) => {}
1613 }
1614 }
1615
1616 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1617 let input_ident = input.emit_core(
1618 builders_or_callback,
1619 seen_tees,
1620 built_tees,
1621 next_stmt_id,
1622 fold_hooked_idents,
1623 );
1624
1625 match builders_or_callback {
1626 BuildersOrCallback::Builders(graph_builders) => {
1627 graph_builders
1628 .get_dfir_mut(&input.metadata().location_id)
1629 .add_dfir(
1630 parse_quote! {
1631 #input_ident -> for_each(&mut #ident);
1632 },
1633 None,
1634 Some(&next_stmt_id.to_string()),
1635 );
1636 }
1637 BuildersOrCallback::Callback(leaf_callback, _) => {
1638 leaf_callback(self, next_stmt_id);
1639 }
1640 }
1641
1642 let _ = next_stmt_id.get_and_increment();
1643 }
1644
1645 HydroRoot::Null { input, .. } => {
1646 let input_ident = input.emit_core(
1647 builders_or_callback,
1648 seen_tees,
1649 built_tees,
1650 next_stmt_id,
1651 fold_hooked_idents,
1652 );
1653
1654 match builders_or_callback {
1655 BuildersOrCallback::Builders(graph_builders) => {
1656 graph_builders
1657 .get_dfir_mut(&input.metadata().location_id)
1658 .add_dfir(
1659 parse_quote! {
1660 #input_ident -> for_each(|_| {});
1661 },
1662 None,
1663 Some(&next_stmt_id.to_string()),
1664 );
1665 }
1666 BuildersOrCallback::Callback(leaf_callback, _) => {
1667 leaf_callback(self, next_stmt_id);
1668 }
1669 }
1670
1671 let _ = next_stmt_id.get_and_increment();
1672 }
1673 }
1674 }
1675
1676 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1677 match self {
1678 HydroRoot::ForEach { op_metadata, .. }
1679 | HydroRoot::SendExternal { op_metadata, .. }
1680 | HydroRoot::DestSink { op_metadata, .. }
1681 | HydroRoot::CycleSink { op_metadata, .. }
1682 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1683 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1684 }
1685 }
1686
1687 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1688 match self {
1689 HydroRoot::ForEach { op_metadata, .. }
1690 | HydroRoot::SendExternal { op_metadata, .. }
1691 | HydroRoot::DestSink { op_metadata, .. }
1692 | HydroRoot::CycleSink { op_metadata, .. }
1693 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1694 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1695 }
1696 }
1697
1698 pub fn input(&self) -> &HydroNode {
1699 match self {
1700 HydroRoot::ForEach { input, .. }
1701 | HydroRoot::SendExternal { input, .. }
1702 | HydroRoot::DestSink { input, .. }
1703 | HydroRoot::CycleSink { input, .. }
1704 | HydroRoot::EmbeddedOutput { input, .. }
1705 | HydroRoot::Null { input, .. } => input,
1706 }
1707 }
1708
1709 pub fn input_metadata(&self) -> &HydroIrMetadata {
1710 self.input().metadata()
1711 }
1712
1713 pub fn print_root(&self) -> String {
1714 match self {
1715 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1716 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1717 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1718 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1719 HydroRoot::EmbeddedOutput { ident, .. } => {
1720 format!("EmbeddedOutput({})", ident)
1721 }
1722 HydroRoot::Null { .. } => "Null".to_owned(),
1723 }
1724 }
1725
1726 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1727 match self {
1728 HydroRoot::ForEach { f, .. } => {
1729 transform(&mut f.expr);
1730 }
1731 HydroRoot::DestSink { sink, .. } => {
1732 transform(sink);
1733 }
1734 HydroRoot::SendExternal { .. }
1735 | HydroRoot::CycleSink { .. }
1736 | HydroRoot::EmbeddedOutput { .. }
1737 | HydroRoot::Null { .. } => {}
1738 }
1739 }
1740}
1741
1742#[cfg(feature = "build")]
1743fn tick_of(loc: &LocationId) -> Option<ClockId> {
1744 match loc {
1745 LocationId::Tick(id, _) => Some(*id),
1746 LocationId::Atomic(inner) => tick_of(inner),
1747 _ => None,
1748 }
1749}
1750
1751#[cfg(feature = "build")]
1752fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1753 match loc {
1754 LocationId::Tick(id, inner) => {
1755 *id = uf_find(uf, *id);
1756 remap_location(inner, uf);
1757 }
1758 LocationId::Atomic(inner) => {
1759 remap_location(inner, uf);
1760 }
1761 LocationId::Process(_) | LocationId::Cluster(_) => {}
1762 }
1763}
1764
1765#[cfg(feature = "build")]
1766fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1767 let p = *parent.get(&x).unwrap_or(&x);
1768 if p == x {
1769 return x;
1770 }
1771 let root = uf_find(parent, p);
1772 parent.insert(x, root);
1773 root
1774}
1775
1776#[cfg(feature = "build")]
1777fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1778 let ra = uf_find(parent, a);
1779 let rb = uf_find(parent, b);
1780 if ra != rb {
1781 parent.insert(ra, rb);
1782 }
1783}
1784
1785#[cfg(feature = "build")]
1789pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1790 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1791
1792 transform_bottom_up(
1794 ir,
1795 &mut |_| {},
1796 &mut |node: &mut HydroNode| match node {
1797 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1798 if let (Some(a), Some(b)) = (
1799 tick_of(&inner.metadata().location_id),
1800 tick_of(&metadata.location_id),
1801 ) {
1802 uf_union(&mut uf, a, b);
1803 }
1804 }
1805 HydroNode::Chain {
1806 first,
1807 second,
1808 metadata,
1809 }
1810 | HydroNode::ChainFirst {
1811 first,
1812 second,
1813 metadata,
1814 }
1815 | HydroNode::MergeOrdered {
1816 first,
1817 second,
1818 metadata,
1819 } => {
1820 if let (Some(a), Some(b)) = (
1821 tick_of(&first.metadata().location_id),
1822 tick_of(&metadata.location_id),
1823 ) {
1824 uf_union(&mut uf, a, b);
1825 }
1826 if let (Some(a), Some(b)) = (
1827 tick_of(&second.metadata().location_id),
1828 tick_of(&metadata.location_id),
1829 ) {
1830 uf_union(&mut uf, a, b);
1831 }
1832 }
1833 _ => {}
1834 },
1835 false,
1836 );
1837
1838 transform_bottom_up(
1840 ir,
1841 &mut |_| {},
1842 &mut |node: &mut HydroNode| {
1843 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1844 },
1845 false,
1846 );
1847}
1848
1849#[cfg(feature = "build")]
1850pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1851 let mut builders = SecondaryMap::new();
1852 let mut seen_tees = HashMap::new();
1853 let mut built_tees = HashMap::new();
1854 let mut next_stmt_id = StmtId::default();
1855 let mut fold_hooked_idents = HashSet::new();
1856 for leaf in ir {
1857 leaf.emit(
1858 &mut builders,
1859 &mut seen_tees,
1860 &mut built_tees,
1861 &mut next_stmt_id,
1862 &mut fold_hooked_idents,
1863 );
1864 }
1865 builders
1866}
1867
1868#[cfg(feature = "build")]
1869pub fn traverse_dfir(
1870 ir: &mut [HydroRoot],
1871 transform_root: impl FnMut(&mut HydroRoot, &mut StmtId),
1872 transform_node: impl FnMut(&mut HydroNode, &mut StmtId),
1873) {
1874 let mut seen_tees = HashMap::new();
1875 let mut built_tees = HashMap::new();
1876 let mut next_stmt_id = StmtId::default();
1877 let mut fold_hooked_idents = HashSet::new();
1878 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1879 ir.iter_mut().for_each(|leaf| {
1880 leaf.emit_core(
1881 &mut callback,
1882 &mut seen_tees,
1883 &mut built_tees,
1884 &mut next_stmt_id,
1885 &mut fold_hooked_idents,
1886 );
1887 });
1888}
1889
1890pub fn transform_bottom_up(
1891 ir: &mut [HydroRoot],
1892 transform_root: &mut impl FnMut(&mut HydroRoot),
1893 transform_node: &mut impl FnMut(&mut HydroNode),
1894 check_well_formed: bool,
1895) {
1896 let mut seen_tees = HashMap::new();
1897 ir.iter_mut().for_each(|leaf| {
1898 leaf.transform_bottom_up(
1899 transform_root,
1900 transform_node,
1901 &mut seen_tees,
1902 check_well_formed,
1903 );
1904 });
1905}
1906
1907pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1908 let mut seen_tees = HashMap::new();
1909 ir.iter()
1910 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1911 .collect()
1912}
1913
1914type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1915thread_local! {
1916 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1917 static SERIALIZED_SHARED: PrintedTees
1921 = const { RefCell::new(None) };
1922}
1923
1924pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1925 PRINTED_TEES.with(|printed_tees| {
1926 let mut printed_tees_mut = printed_tees.borrow_mut();
1927 *printed_tees_mut = Some((0, HashMap::new()));
1928 drop(printed_tees_mut);
1929
1930 let ret = f();
1931
1932 let mut printed_tees_mut = printed_tees.borrow_mut();
1933 *printed_tees_mut = None;
1934
1935 ret
1936 })
1937}
1938
1939pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1944 let _guard = SerializedSharedGuard::enter();
1945 f()
1946}
1947
1948struct SerializedSharedGuard {
1951 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1952}
1953
1954impl SerializedSharedGuard {
1955 fn enter() -> Self {
1956 let previous = SERIALIZED_SHARED.with(|cell| {
1957 let mut guard = cell.borrow_mut();
1958 guard.replace((0, HashMap::new()))
1959 });
1960 Self { previous }
1961 }
1962}
1963
1964impl Drop for SerializedSharedGuard {
1965 fn drop(&mut self) {
1966 SERIALIZED_SHARED.with(|cell| {
1967 *cell.borrow_mut() = self.previous.take();
1968 });
1969 }
1970}
1971
1972pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1973
1974impl serde::Serialize for SharedNode {
1975 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
1986 SERIALIZED_SHARED.with(|cell| {
1987 let mut guard = cell.borrow_mut();
1988 let state = guard.as_mut().ok_or_else(|| {
1990 serde::ser::Error::custom(
1991 "SharedNode serialization requires an active serialize_dedup_shared scope",
1992 )
1993 })?;
1994 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
1995
1996 if let Some(&id) = state.1.get(&ptr) {
1997 drop(guard);
1998 use serde::ser::SerializeMap;
1999 let mut map = serializer.serialize_map(Some(1))?;
2000 map.serialize_entry("$shared_ref", &id)?;
2001 map.end()
2002 } else {
2003 let id = state.0;
2004 state.0 += 1;
2005 state.1.insert(ptr, id);
2006 drop(guard);
2007
2008 use serde::ser::SerializeMap;
2009 let mut map = serializer.serialize_map(Some(2))?;
2010 map.serialize_entry("$shared", &id)?;
2011 map.serialize_entry("node", &*self.0.borrow())?;
2012 map.end()
2013 }
2014 })
2015 }
2016}
2017
2018impl SharedNode {
2019 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2020 Rc::as_ptr(&self.0)
2021 }
2022}
2023
2024impl Debug for SharedNode {
2025 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2026 PRINTED_TEES.with(|printed_tees| {
2027 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2028 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2029
2030 if let Some(printed_tees_mut) = printed_tees_mut {
2031 if let Some(existing) = printed_tees_mut
2032 .1
2033 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2034 {
2035 write!(f, "<shared {}>", existing)
2036 } else {
2037 let next_id = printed_tees_mut.0;
2038 printed_tees_mut.0 += 1;
2039 printed_tees_mut
2040 .1
2041 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2042 drop(printed_tees_mut_borrow);
2043 write!(f, "<shared {}>: ", next_id)?;
2044 Debug::fmt(&self.0.borrow(), f)
2045 }
2046 } else {
2047 drop(printed_tees_mut_borrow);
2048 write!(f, "<shared>: ")?;
2049 Debug::fmt(&self.0.borrow(), f)
2050 }
2051 })
2052 }
2053}
2054
2055impl Hash for SharedNode {
2056 fn hash<H: Hasher>(&self, state: &mut H) {
2057 self.0.borrow_mut().hash(state);
2058 }
2059}
2060
2061#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2062pub enum BoundKind {
2063 Unbounded,
2064 Bounded,
2065}
2066
2067#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2068pub enum StreamOrder {
2069 NoOrder,
2070 TotalOrder,
2071}
2072
2073#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2074pub enum StreamRetry {
2075 AtLeastOnce,
2076 ExactlyOnce,
2077}
2078
2079#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2080pub enum KeyedSingletonBoundKind {
2081 Unbounded,
2082 MonotonicValue,
2083 BoundedValue,
2084 Bounded,
2085}
2086
2087#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2088pub enum SingletonBoundKind {
2089 Unbounded,
2090 Monotonic,
2091 Bounded,
2092}
2093
2094#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2095pub enum CollectionKind {
2096 Stream {
2097 bound: BoundKind,
2098 order: StreamOrder,
2099 retry: StreamRetry,
2100 element_type: DebugType,
2101 },
2102 Singleton {
2103 bound: SingletonBoundKind,
2104 element_type: DebugType,
2105 },
2106 Optional {
2107 bound: BoundKind,
2108 element_type: DebugType,
2109 },
2110 KeyedStream {
2111 bound: BoundKind,
2112 value_order: StreamOrder,
2113 value_retry: StreamRetry,
2114 key_type: DebugType,
2115 value_type: DebugType,
2116 },
2117 KeyedSingleton {
2118 bound: KeyedSingletonBoundKind,
2119 key_type: DebugType,
2120 value_type: DebugType,
2121 },
2122}
2123
2124impl CollectionKind {
2125 pub fn is_bounded(&self) -> bool {
2126 matches!(
2127 self,
2128 CollectionKind::Stream {
2129 bound: BoundKind::Bounded,
2130 ..
2131 } | CollectionKind::Singleton {
2132 bound: SingletonBoundKind::Bounded,
2133 ..
2134 } | CollectionKind::Optional {
2135 bound: BoundKind::Bounded,
2136 ..
2137 } | CollectionKind::KeyedStream {
2138 bound: BoundKind::Bounded,
2139 ..
2140 } | CollectionKind::KeyedSingleton {
2141 bound: KeyedSingletonBoundKind::Bounded,
2142 ..
2143 }
2144 )
2145 }
2146}
2147
2148#[derive(Clone, serde::Serialize)]
2149pub struct HydroIrMetadata {
2150 pub location_id: LocationId,
2151 pub collection_kind: CollectionKind,
2152 pub consistency: Option<ClusterConsistency>,
2153 pub cardinality: Option<usize>,
2154 pub tag: Option<String>,
2155 pub op: HydroIrOpMetadata,
2156}
2157
2158impl Hash for HydroIrMetadata {
2160 fn hash<H: Hasher>(&self, _: &mut H) {}
2161}
2162
2163impl PartialEq for HydroIrMetadata {
2164 fn eq(&self, _: &Self) -> bool {
2165 true
2166 }
2167}
2168
2169impl Eq for HydroIrMetadata {}
2170
2171impl Debug for HydroIrMetadata {
2172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2173 f.debug_struct("HydroIrMetadata")
2174 .field("location_id", &self.location_id)
2175 .field("collection_kind", &self.collection_kind)
2176 .finish()
2177 }
2178}
2179
2180#[derive(Clone, serde::Serialize)]
2183pub struct HydroIrOpMetadata {
2184 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2185 pub backtrace: Backtrace,
2186 pub cpu_usage: Option<f64>,
2187 pub network_recv_cpu_usage: Option<f64>,
2188 pub id: Option<usize>,
2189}
2190
2191impl HydroIrOpMetadata {
2192 #[expect(
2193 clippy::new_without_default,
2194 reason = "explicit calls to new ensure correct backtrace bounds"
2195 )]
2196 pub fn new() -> HydroIrOpMetadata {
2197 Self::new_with_skip(1)
2198 }
2199
2200 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2201 HydroIrOpMetadata {
2202 backtrace: Backtrace::get_backtrace(2 + skip_count),
2203 cpu_usage: None,
2204 network_recv_cpu_usage: None,
2205 id: None,
2206 }
2207 }
2208}
2209
2210impl Debug for HydroIrOpMetadata {
2211 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2212 f.debug_struct("HydroIrOpMetadata").finish()
2213 }
2214}
2215
2216impl Hash for HydroIrOpMetadata {
2217 fn hash<H: Hasher>(&self, _: &mut H) {}
2218}
2219
2220#[derive(Debug, Hash, serde::Serialize)]
2223pub enum HydroNode {
2224 Placeholder,
2225
2226 Cast {
2234 inner: Box<HydroNode>,
2235 metadata: HydroIrMetadata,
2236 },
2237
2238 ObserveNonDet {
2244 inner: Box<HydroNode>,
2245 trusted: bool, metadata: HydroIrMetadata,
2247 },
2248
2249 Source {
2250 source: HydroSource,
2251 metadata: HydroIrMetadata,
2252 },
2253
2254 SingletonSource {
2255 value: DebugExpr,
2256 first_tick_only: bool,
2257 metadata: HydroIrMetadata,
2258 },
2259
2260 CycleSource {
2261 cycle_id: CycleId,
2262 metadata: HydroIrMetadata,
2263 },
2264
2265 Tee {
2266 inner: SharedNode,
2267 metadata: HydroIrMetadata,
2268 },
2269
2270 Reference {
2279 inner: SharedNode,
2280 kind: crate::handoff_ref::HandoffRefKind,
2281 metadata: HydroIrMetadata,
2282 },
2283
2284 Partition {
2285 inner: SharedNode,
2286 f: ClosureExpr,
2287 is_true: bool,
2288 metadata: HydroIrMetadata,
2289 },
2290
2291 BeginAtomic {
2292 inner: Box<HydroNode>,
2293 metadata: HydroIrMetadata,
2294 },
2295
2296 EndAtomic {
2297 inner: Box<HydroNode>,
2298 metadata: HydroIrMetadata,
2299 },
2300
2301 Batch {
2302 inner: Box<HydroNode>,
2303 metadata: HydroIrMetadata,
2304 },
2305
2306 YieldConcat {
2307 inner: Box<HydroNode>,
2308 metadata: HydroIrMetadata,
2309 },
2310
2311 Chain {
2312 first: Box<HydroNode>,
2313 second: Box<HydroNode>,
2314 metadata: HydroIrMetadata,
2315 },
2316
2317 MergeOrdered {
2318 first: Box<HydroNode>,
2319 second: Box<HydroNode>,
2320 metadata: HydroIrMetadata,
2321 },
2322
2323 ChainFirst {
2324 first: Box<HydroNode>,
2325 second: Box<HydroNode>,
2326 metadata: HydroIrMetadata,
2327 },
2328
2329 CrossProduct {
2330 left: Box<HydroNode>,
2331 right: Box<HydroNode>,
2332 metadata: HydroIrMetadata,
2333 },
2334
2335 CrossSingleton {
2336 left: Box<HydroNode>,
2337 right: Box<HydroNode>,
2338 metadata: HydroIrMetadata,
2339 },
2340
2341 Join {
2342 left: Box<HydroNode>,
2343 right: Box<HydroNode>,
2344 metadata: HydroIrMetadata,
2345 },
2346
2347 JoinHalf {
2351 left: Box<HydroNode>,
2352 right: Box<HydroNode>,
2353 metadata: HydroIrMetadata,
2354 },
2355
2356 Difference {
2357 pos: Box<HydroNode>,
2358 neg: Box<HydroNode>,
2359 metadata: HydroIrMetadata,
2360 },
2361
2362 AntiJoin {
2363 pos: Box<HydroNode>,
2364 neg: Box<HydroNode>,
2365 metadata: HydroIrMetadata,
2366 },
2367
2368 ResolveFutures {
2369 input: Box<HydroNode>,
2370 metadata: HydroIrMetadata,
2371 },
2372 ResolveFuturesBlocking {
2373 input: Box<HydroNode>,
2374 metadata: HydroIrMetadata,
2375 },
2376 ResolveFuturesOrdered {
2377 input: Box<HydroNode>,
2378 metadata: HydroIrMetadata,
2379 },
2380
2381 Map {
2382 f: ClosureExpr,
2383 input: Box<HydroNode>,
2384 metadata: HydroIrMetadata,
2385 },
2386 FlatMap {
2387 f: ClosureExpr,
2388 input: Box<HydroNode>,
2389 metadata: HydroIrMetadata,
2390 },
2391 FlatMapStreamBlocking {
2392 f: ClosureExpr,
2393 input: Box<HydroNode>,
2394 metadata: HydroIrMetadata,
2395 },
2396 Filter {
2397 f: ClosureExpr,
2398 input: Box<HydroNode>,
2399 metadata: HydroIrMetadata,
2400 },
2401 FilterMap {
2402 f: ClosureExpr,
2403 input: Box<HydroNode>,
2404 metadata: HydroIrMetadata,
2405 },
2406
2407 DeferTick {
2408 input: Box<HydroNode>,
2409 metadata: HydroIrMetadata,
2410 },
2411 Enumerate {
2412 input: Box<HydroNode>,
2413 metadata: HydroIrMetadata,
2414 },
2415 Inspect {
2416 f: ClosureExpr,
2417 input: Box<HydroNode>,
2418 metadata: HydroIrMetadata,
2419 },
2420
2421 Unique {
2422 input: Box<HydroNode>,
2423 metadata: HydroIrMetadata,
2424 },
2425
2426 Sort {
2427 input: Box<HydroNode>,
2428 metadata: HydroIrMetadata,
2429 },
2430 Fold {
2431 init: ClosureExpr,
2432 acc: ClosureExpr,
2433 input: Box<HydroNode>,
2434 metadata: HydroIrMetadata,
2435 },
2436
2437 Scan {
2438 init: ClosureExpr,
2439 acc: ClosureExpr,
2440 input: Box<HydroNode>,
2441 metadata: HydroIrMetadata,
2442 },
2443 ScanAsyncBlocking {
2444 init: ClosureExpr,
2445 acc: ClosureExpr,
2446 input: Box<HydroNode>,
2447 metadata: HydroIrMetadata,
2448 },
2449 FoldKeyed {
2450 init: ClosureExpr,
2451 acc: ClosureExpr,
2452 input: Box<HydroNode>,
2453 metadata: HydroIrMetadata,
2454 },
2455
2456 Reduce {
2457 f: ClosureExpr,
2458 input: Box<HydroNode>,
2459 metadata: HydroIrMetadata,
2460 },
2461 ReduceKeyed {
2462 f: ClosureExpr,
2463 input: Box<HydroNode>,
2464 metadata: HydroIrMetadata,
2465 },
2466 ReduceKeyedWatermark {
2467 f: ClosureExpr,
2468 input: Box<HydroNode>,
2469 watermark: Box<HydroNode>,
2470 metadata: HydroIrMetadata,
2471 },
2472
2473 Network {
2474 name: Option<String>,
2475 networking_info: crate::networking::NetworkingInfo,
2476 serialize_fn: Option<DebugExpr>,
2477 instantiate_fn: DebugInstantiate,
2478 deserialize_fn: Option<DebugExpr>,
2479 input: Box<HydroNode>,
2480 metadata: HydroIrMetadata,
2481 },
2482
2483 ExternalInput {
2484 from_external_key: LocationKey,
2485 from_port_id: ExternalPortId,
2486 from_many: bool,
2487 codec_type: DebugType,
2488 #[serde(skip)]
2489 port_hint: NetworkHint,
2490 instantiate_fn: DebugInstantiate,
2491 deserialize_fn: Option<DebugExpr>,
2492 metadata: HydroIrMetadata,
2493 },
2494
2495 Counter {
2496 tag: String,
2497 duration: DebugExpr,
2498 prefix: String,
2499 input: Box<HydroNode>,
2500 metadata: HydroIrMetadata,
2501 },
2502
2503 AssertIsConsistent {
2504 inner: Box<HydroNode>,
2505 trusted: bool,
2506 metadata: HydroIrMetadata,
2507 },
2508
2509 UnboundSingleton {
2510 inner: Box<HydroNode>,
2511 metadata: HydroIrMetadata,
2512 },
2513}
2514
2515pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2516pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2517
2518impl HydroNode {
2519 pub fn transform_bottom_up(
2520 &mut self,
2521 transform: &mut impl FnMut(&mut HydroNode),
2522 seen_tees: &mut SeenSharedNodes,
2523 check_well_formed: bool,
2524 ) {
2525 self.transform_children(
2526 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2527 seen_tees,
2528 );
2529
2530 transform(self);
2531
2532 let self_location = self.metadata().location_id.root();
2533
2534 if check_well_formed {
2535 match &*self {
2536 HydroNode::Network { .. } => {}
2537 _ => {
2538 self.input_metadata().iter().for_each(|i| {
2539 if i.location_id.root() != self_location {
2540 panic!(
2541 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2542 i,
2543 i.location_id.root(),
2544 self,
2545 self_location
2546 )
2547 }
2548 });
2549 }
2550 }
2551 }
2552 }
2553
2554 #[inline(always)]
2555 pub fn transform_children(
2556 &mut self,
2557 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2558 seen_tees: &mut SeenSharedNodes,
2559 ) {
2560 match self {
2561 HydroNode::Placeholder => {
2562 panic!();
2563 }
2564
2565 HydroNode::Source { .. }
2566 | HydroNode::SingletonSource { .. }
2567 | HydroNode::CycleSource { .. }
2568 | HydroNode::ExternalInput { .. } => {}
2569
2570 HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2571 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2572 *inner = SharedNode(transformed.clone());
2573 } else {
2574 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2575 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2576 let mut orig = inner.0.replace(HydroNode::Placeholder);
2577 transform(&mut orig, seen_tees);
2578 *transformed_cell.borrow_mut() = orig;
2579 *inner = SharedNode(transformed_cell);
2580 }
2581 }
2582
2583 HydroNode::Partition { inner, f, .. } => {
2584 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2585 *inner = SharedNode(transformed.clone());
2586 } else {
2587 f.transform_children(&mut transform, seen_tees);
2588 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2589 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2590 let mut orig = inner.0.replace(HydroNode::Placeholder);
2591 transform(&mut orig, seen_tees);
2592 *transformed_cell.borrow_mut() = orig;
2593 *inner = SharedNode(transformed_cell);
2594 }
2595 }
2596
2597 HydroNode::Cast { inner, .. }
2598 | HydroNode::ObserveNonDet { inner, .. }
2599 | HydroNode::BeginAtomic { inner, .. }
2600 | HydroNode::EndAtomic { inner, .. }
2601 | HydroNode::Batch { inner, .. }
2602 | HydroNode::YieldConcat { inner, .. }
2603 | HydroNode::UnboundSingleton { inner, .. }
2604 | HydroNode::AssertIsConsistent { inner, .. } => {
2605 transform(inner.as_mut(), seen_tees);
2606 }
2607
2608 HydroNode::Chain { first, second, .. } => {
2609 transform(first.as_mut(), seen_tees);
2610 transform(second.as_mut(), seen_tees);
2611 }
2612
2613 HydroNode::MergeOrdered { first, second, .. } => {
2614 transform(first.as_mut(), seen_tees);
2615 transform(second.as_mut(), seen_tees);
2616 }
2617
2618 HydroNode::ChainFirst { first, second, .. } => {
2619 transform(first.as_mut(), seen_tees);
2620 transform(second.as_mut(), seen_tees);
2621 }
2622
2623 HydroNode::CrossSingleton { left, right, .. }
2624 | HydroNode::CrossProduct { left, right, .. }
2625 | HydroNode::Join { left, right, .. }
2626 | HydroNode::JoinHalf { left, right, .. } => {
2627 transform(left.as_mut(), seen_tees);
2628 transform(right.as_mut(), seen_tees);
2629 }
2630
2631 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2632 transform(pos.as_mut(), seen_tees);
2633 transform(neg.as_mut(), seen_tees);
2634 }
2635
2636 HydroNode::Map { f, input, .. } => {
2637 f.transform_children(&mut transform, seen_tees);
2638 transform(input.as_mut(), seen_tees);
2639 }
2640 HydroNode::FlatMap { f, input, .. }
2641 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2642 | HydroNode::Filter { f, input, .. }
2643 | HydroNode::FilterMap { f, input, .. }
2644 | HydroNode::Inspect { f, input, .. }
2645 | HydroNode::Reduce { f, input, .. }
2646 | HydroNode::ReduceKeyed { f, input, .. } => {
2647 f.transform_children(&mut transform, seen_tees);
2648 transform(input.as_mut(), seen_tees);
2649 }
2650 HydroNode::ReduceKeyedWatermark {
2651 f,
2652 input,
2653 watermark,
2654 ..
2655 } => {
2656 f.transform_children(&mut transform, seen_tees);
2657 transform(input.as_mut(), seen_tees);
2658 transform(watermark.as_mut(), seen_tees);
2659 }
2660 HydroNode::Fold {
2661 init, acc, input, ..
2662 }
2663 | HydroNode::Scan {
2664 init, acc, input, ..
2665 }
2666 | HydroNode::ScanAsyncBlocking {
2667 init, acc, input, ..
2668 }
2669 | HydroNode::FoldKeyed {
2670 init, acc, input, ..
2671 } => {
2672 init.transform_children(&mut transform, seen_tees);
2673 acc.transform_children(&mut transform, seen_tees);
2674 transform(input.as_mut(), seen_tees);
2675 }
2676 HydroNode::ResolveFutures { input, .. }
2677 | HydroNode::ResolveFuturesBlocking { input, .. }
2678 | HydroNode::ResolveFuturesOrdered { input, .. }
2679 | HydroNode::Sort { input, .. }
2680 | HydroNode::DeferTick { input, .. }
2681 | HydroNode::Enumerate { input, .. }
2682 | HydroNode::Unique { input, .. }
2683 | HydroNode::Network { input, .. }
2684 | HydroNode::Counter { input, .. } => {
2685 transform(input.as_mut(), seen_tees);
2686 }
2687 }
2688 }
2689
2690 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2691 match self {
2692 HydroNode::Placeholder => HydroNode::Placeholder,
2693 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2694 inner: Box::new(inner.deep_clone(seen_tees)),
2695 metadata: metadata.clone(),
2696 },
2697 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2698 inner: Box::new(inner.deep_clone(seen_tees)),
2699 metadata: metadata.clone(),
2700 },
2701 HydroNode::ObserveNonDet {
2702 inner,
2703 trusted,
2704 metadata,
2705 } => HydroNode::ObserveNonDet {
2706 inner: Box::new(inner.deep_clone(seen_tees)),
2707 trusted: *trusted,
2708 metadata: metadata.clone(),
2709 },
2710 HydroNode::AssertIsConsistent {
2711 inner,
2712 trusted,
2713 metadata,
2714 } => HydroNode::AssertIsConsistent {
2715 inner: Box::new(inner.deep_clone(seen_tees)),
2716 trusted: *trusted,
2717 metadata: metadata.clone(),
2718 },
2719 HydroNode::Source { source, metadata } => HydroNode::Source {
2720 source: source.clone(),
2721 metadata: metadata.clone(),
2722 },
2723 HydroNode::SingletonSource {
2724 value,
2725 first_tick_only,
2726 metadata,
2727 } => HydroNode::SingletonSource {
2728 value: value.clone(),
2729 first_tick_only: *first_tick_only,
2730 metadata: metadata.clone(),
2731 },
2732 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2733 cycle_id: *cycle_id,
2734 metadata: metadata.clone(),
2735 },
2736 HydroNode::Tee { inner, metadata }
2737 | HydroNode::Reference {
2738 inner, metadata, ..
2739 } => {
2740 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2741 SharedNode(transformed.clone())
2742 } else {
2743 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2744 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2745 let cloned = inner.0.borrow().deep_clone(seen_tees);
2746 *new_rc.borrow_mut() = cloned;
2747 SharedNode(new_rc)
2748 };
2749 if let HydroNode::Reference { kind, .. } = self {
2750 HydroNode::Reference {
2751 inner: cloned_inner,
2752 kind: *kind,
2753 metadata: metadata.clone(),
2754 }
2755 } else {
2756 HydroNode::Tee {
2757 inner: cloned_inner,
2758 metadata: metadata.clone(),
2759 }
2760 }
2761 }
2762 HydroNode::Partition {
2763 inner,
2764 f,
2765 is_true,
2766 metadata,
2767 } => {
2768 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2769 HydroNode::Partition {
2770 inner: SharedNode(transformed.clone()),
2771 f: f.deep_clone(seen_tees),
2772 is_true: *is_true,
2773 metadata: metadata.clone(),
2774 }
2775 } else {
2776 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2777 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2778 let cloned = inner.0.borrow().deep_clone(seen_tees);
2779 *new_rc.borrow_mut() = cloned;
2780 HydroNode::Partition {
2781 inner: SharedNode(new_rc),
2782 f: f.deep_clone(seen_tees),
2783 is_true: *is_true,
2784 metadata: metadata.clone(),
2785 }
2786 }
2787 }
2788 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2789 inner: Box::new(inner.deep_clone(seen_tees)),
2790 metadata: metadata.clone(),
2791 },
2792 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2793 inner: Box::new(inner.deep_clone(seen_tees)),
2794 metadata: metadata.clone(),
2795 },
2796 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2797 inner: Box::new(inner.deep_clone(seen_tees)),
2798 metadata: metadata.clone(),
2799 },
2800 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2801 inner: Box::new(inner.deep_clone(seen_tees)),
2802 metadata: metadata.clone(),
2803 },
2804 HydroNode::Chain {
2805 first,
2806 second,
2807 metadata,
2808 } => HydroNode::Chain {
2809 first: Box::new(first.deep_clone(seen_tees)),
2810 second: Box::new(second.deep_clone(seen_tees)),
2811 metadata: metadata.clone(),
2812 },
2813 HydroNode::MergeOrdered {
2814 first,
2815 second,
2816 metadata,
2817 } => HydroNode::MergeOrdered {
2818 first: Box::new(first.deep_clone(seen_tees)),
2819 second: Box::new(second.deep_clone(seen_tees)),
2820 metadata: metadata.clone(),
2821 },
2822 HydroNode::ChainFirst {
2823 first,
2824 second,
2825 metadata,
2826 } => HydroNode::ChainFirst {
2827 first: Box::new(first.deep_clone(seen_tees)),
2828 second: Box::new(second.deep_clone(seen_tees)),
2829 metadata: metadata.clone(),
2830 },
2831 HydroNode::CrossProduct {
2832 left,
2833 right,
2834 metadata,
2835 } => HydroNode::CrossProduct {
2836 left: Box::new(left.deep_clone(seen_tees)),
2837 right: Box::new(right.deep_clone(seen_tees)),
2838 metadata: metadata.clone(),
2839 },
2840 HydroNode::CrossSingleton {
2841 left,
2842 right,
2843 metadata,
2844 } => HydroNode::CrossSingleton {
2845 left: Box::new(left.deep_clone(seen_tees)),
2846 right: Box::new(right.deep_clone(seen_tees)),
2847 metadata: metadata.clone(),
2848 },
2849 HydroNode::Join {
2850 left,
2851 right,
2852 metadata,
2853 } => HydroNode::Join {
2854 left: Box::new(left.deep_clone(seen_tees)),
2855 right: Box::new(right.deep_clone(seen_tees)),
2856 metadata: metadata.clone(),
2857 },
2858 HydroNode::JoinHalf {
2859 left,
2860 right,
2861 metadata,
2862 } => HydroNode::JoinHalf {
2863 left: Box::new(left.deep_clone(seen_tees)),
2864 right: Box::new(right.deep_clone(seen_tees)),
2865 metadata: metadata.clone(),
2866 },
2867 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2868 pos: Box::new(pos.deep_clone(seen_tees)),
2869 neg: Box::new(neg.deep_clone(seen_tees)),
2870 metadata: metadata.clone(),
2871 },
2872 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2873 pos: Box::new(pos.deep_clone(seen_tees)),
2874 neg: Box::new(neg.deep_clone(seen_tees)),
2875 metadata: metadata.clone(),
2876 },
2877 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2878 input: Box::new(input.deep_clone(seen_tees)),
2879 metadata: metadata.clone(),
2880 },
2881 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2882 HydroNode::ResolveFuturesBlocking {
2883 input: Box::new(input.deep_clone(seen_tees)),
2884 metadata: metadata.clone(),
2885 }
2886 }
2887 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2888 HydroNode::ResolveFuturesOrdered {
2889 input: Box::new(input.deep_clone(seen_tees)),
2890 metadata: metadata.clone(),
2891 }
2892 }
2893 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2894 f: f.deep_clone(seen_tees),
2895 input: Box::new(input.deep_clone(seen_tees)),
2896 metadata: metadata.clone(),
2897 },
2898 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2899 f: f.deep_clone(seen_tees),
2900 input: Box::new(input.deep_clone(seen_tees)),
2901 metadata: metadata.clone(),
2902 },
2903 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2904 HydroNode::FlatMapStreamBlocking {
2905 f: f.deep_clone(seen_tees),
2906 input: Box::new(input.deep_clone(seen_tees)),
2907 metadata: metadata.clone(),
2908 }
2909 }
2910 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2911 f: f.deep_clone(seen_tees),
2912 input: Box::new(input.deep_clone(seen_tees)),
2913 metadata: metadata.clone(),
2914 },
2915 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2916 f: f.deep_clone(seen_tees),
2917 input: Box::new(input.deep_clone(seen_tees)),
2918 metadata: metadata.clone(),
2919 },
2920 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2921 input: Box::new(input.deep_clone(seen_tees)),
2922 metadata: metadata.clone(),
2923 },
2924 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2925 input: Box::new(input.deep_clone(seen_tees)),
2926 metadata: metadata.clone(),
2927 },
2928 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2929 f: f.deep_clone(seen_tees),
2930 input: Box::new(input.deep_clone(seen_tees)),
2931 metadata: metadata.clone(),
2932 },
2933 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2934 input: Box::new(input.deep_clone(seen_tees)),
2935 metadata: metadata.clone(),
2936 },
2937 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2938 input: Box::new(input.deep_clone(seen_tees)),
2939 metadata: metadata.clone(),
2940 },
2941 HydroNode::Fold {
2942 init,
2943 acc,
2944 input,
2945 metadata,
2946 } => HydroNode::Fold {
2947 init: init.deep_clone(seen_tees),
2948 acc: acc.deep_clone(seen_tees),
2949 input: Box::new(input.deep_clone(seen_tees)),
2950 metadata: metadata.clone(),
2951 },
2952 HydroNode::Scan {
2953 init,
2954 acc,
2955 input,
2956 metadata,
2957 } => HydroNode::Scan {
2958 init: init.deep_clone(seen_tees),
2959 acc: acc.deep_clone(seen_tees),
2960 input: Box::new(input.deep_clone(seen_tees)),
2961 metadata: metadata.clone(),
2962 },
2963 HydroNode::ScanAsyncBlocking {
2964 init,
2965 acc,
2966 input,
2967 metadata,
2968 } => HydroNode::ScanAsyncBlocking {
2969 init: init.deep_clone(seen_tees),
2970 acc: acc.deep_clone(seen_tees),
2971 input: Box::new(input.deep_clone(seen_tees)),
2972 metadata: metadata.clone(),
2973 },
2974 HydroNode::FoldKeyed {
2975 init,
2976 acc,
2977 input,
2978 metadata,
2979 } => HydroNode::FoldKeyed {
2980 init: init.deep_clone(seen_tees),
2981 acc: acc.deep_clone(seen_tees),
2982 input: Box::new(input.deep_clone(seen_tees)),
2983 metadata: metadata.clone(),
2984 },
2985 HydroNode::ReduceKeyedWatermark {
2986 f,
2987 input,
2988 watermark,
2989 metadata,
2990 } => HydroNode::ReduceKeyedWatermark {
2991 f: f.deep_clone(seen_tees),
2992 input: Box::new(input.deep_clone(seen_tees)),
2993 watermark: Box::new(watermark.deep_clone(seen_tees)),
2994 metadata: metadata.clone(),
2995 },
2996 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2997 f: f.deep_clone(seen_tees),
2998 input: Box::new(input.deep_clone(seen_tees)),
2999 metadata: metadata.clone(),
3000 },
3001 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3002 f: f.deep_clone(seen_tees),
3003 input: Box::new(input.deep_clone(seen_tees)),
3004 metadata: metadata.clone(),
3005 },
3006 HydroNode::Network {
3007 name,
3008 networking_info,
3009 serialize_fn,
3010 instantiate_fn,
3011 deserialize_fn,
3012 input,
3013 metadata,
3014 } => HydroNode::Network {
3015 name: name.clone(),
3016 networking_info: networking_info.clone(),
3017 serialize_fn: serialize_fn.clone(),
3018 instantiate_fn: instantiate_fn.clone(),
3019 deserialize_fn: deserialize_fn.clone(),
3020 input: Box::new(input.deep_clone(seen_tees)),
3021 metadata: metadata.clone(),
3022 },
3023 HydroNode::ExternalInput {
3024 from_external_key,
3025 from_port_id,
3026 from_many,
3027 codec_type,
3028 port_hint,
3029 instantiate_fn,
3030 deserialize_fn,
3031 metadata,
3032 } => HydroNode::ExternalInput {
3033 from_external_key: *from_external_key,
3034 from_port_id: *from_port_id,
3035 from_many: *from_many,
3036 codec_type: codec_type.clone(),
3037 port_hint: *port_hint,
3038 instantiate_fn: instantiate_fn.clone(),
3039 deserialize_fn: deserialize_fn.clone(),
3040 metadata: metadata.clone(),
3041 },
3042 HydroNode::Counter {
3043 tag,
3044 duration,
3045 prefix,
3046 input,
3047 metadata,
3048 } => HydroNode::Counter {
3049 tag: tag.clone(),
3050 duration: duration.clone(),
3051 prefix: prefix.clone(),
3052 input: Box::new(input.deep_clone(seen_tees)),
3053 metadata: metadata.clone(),
3054 },
3055 }
3056 }
3057
3058 #[cfg(feature = "build")]
3059 pub fn emit_core(
3060 &mut self,
3061 builders_or_callback: &mut BuildersOrCallback<
3062 impl FnMut(&mut HydroRoot, &mut StmtId),
3063 impl FnMut(&mut HydroNode, &mut StmtId),
3064 >,
3065 seen_tees: &mut SeenSharedNodes,
3066 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3067 next_stmt_id: &mut StmtId,
3068 fold_hooked_idents: &mut HashSet<String>,
3069 ) -> syn::Ident {
3070 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3071 let mut singleton_access_counters: HashMap<*const RefCell<HydroNode>, u32> = HashMap::new();
3072
3073 self.transform_bottom_up(
3074 &mut |node: &mut HydroNode| {
3075 let out_location = node.metadata().location_id.clone();
3076 match node {
3077 HydroNode::Placeholder => {
3078 panic!()
3079 }
3080
3081 HydroNode::Cast { .. } => {
3082 match builders_or_callback {
3085 BuildersOrCallback::Builders(_) => {}
3086 BuildersOrCallback::Callback(_, node_callback) => {
3087 node_callback(node, next_stmt_id);
3088 }
3089 }
3090
3091 let _ = next_stmt_id.get_and_increment();
3092 }
3094
3095 HydroNode::UnboundSingleton { .. } => {
3096 let inner_ident = ident_stack.pop().unwrap();
3097
3098 let out_ident =
3099 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3100
3101 match builders_or_callback {
3102 BuildersOrCallback::Builders(graph_builders) => {
3103 if graph_builders.singleton_intermediates() {
3104 let builder = graph_builders.get_dfir_mut(&out_location);
3105 builder.add_dfir(
3106 parse_quote! {
3107 #out_ident = #inner_ident;
3108 },
3109 None,
3110 None,
3111 );
3112 } else {
3113 let builder = graph_builders.get_dfir_mut(&out_location);
3114 builder.add_dfir(
3115 parse_quote! {
3116 #out_ident = #inner_ident -> persist::<'static>();
3117 },
3118 None,
3119 None,
3120 );
3121 }
3122 }
3123 BuildersOrCallback::Callback(_, node_callback) => {
3124 node_callback(node, next_stmt_id);
3125 }
3126 }
3127
3128 let _ = next_stmt_id.get_and_increment();
3129
3130 ident_stack.push(out_ident);
3131 }
3132
3133 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3134 let inner_ident = ident_stack.pop().unwrap();
3135
3136 let out_ident =
3137 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3138
3139 match builders_or_callback {
3140 BuildersOrCallback::Builders(graph_builders) => {
3141 graph_builders.assert_is_consistent(
3142 *trusted,
3143 &inner.metadata().location_id,
3144 inner_ident,
3145 &out_ident,
3146 );
3147 }
3148 BuildersOrCallback::Callback(_, node_callback) => {
3149 node_callback(node, next_stmt_id);
3150 }
3151 }
3152
3153 let _ = next_stmt_id.get_and_increment();
3154
3155 ident_stack.push(out_ident);
3156 }
3157
3158 HydroNode::ObserveNonDet {
3159 inner,
3160 trusted,
3161 metadata,
3162 ..
3163 } => {
3164 let inner_ident = ident_stack.pop().unwrap();
3165
3166 let observe_ident =
3167 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3168
3169 match builders_or_callback {
3170 BuildersOrCallback::Builders(graph_builders) => {
3171 graph_builders.observe_nondet(
3172 *trusted,
3173 &inner.metadata().location_id,
3174 inner_ident,
3175 &inner.metadata().collection_kind,
3176 &observe_ident,
3177 &metadata.collection_kind,
3178 &metadata.op,
3179 );
3180 }
3181 BuildersOrCallback::Callback(_, node_callback) => {
3182 node_callback(node, next_stmt_id);
3183 }
3184 }
3185
3186 let _ = next_stmt_id.get_and_increment();
3187
3188 ident_stack.push(observe_ident);
3189 }
3190
3191 HydroNode::Batch {
3192 inner, metadata, ..
3193 } => {
3194 let inner_ident = ident_stack.pop().unwrap();
3195
3196 let batch_ident =
3197 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3198
3199 match builders_or_callback {
3200 BuildersOrCallback::Builders(graph_builders) => {
3201 graph_builders.batch(
3202 inner_ident,
3203 &inner.metadata().location_id,
3204 &inner.metadata().collection_kind,
3205 &batch_ident,
3206 &out_location,
3207 &metadata.op,
3208 fold_hooked_idents,
3209 );
3210 }
3211 BuildersOrCallback::Callback(_, node_callback) => {
3212 node_callback(node, next_stmt_id);
3213 }
3214 }
3215
3216 let _ = next_stmt_id.get_and_increment();
3217
3218 ident_stack.push(batch_ident);
3219 }
3220
3221 HydroNode::YieldConcat { inner, .. } => {
3222 let inner_ident = ident_stack.pop().unwrap();
3223
3224 let yield_ident =
3225 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3226
3227 match builders_or_callback {
3228 BuildersOrCallback::Builders(graph_builders) => {
3229 graph_builders.yield_from_tick(
3230 inner_ident,
3231 &inner.metadata().location_id,
3232 &inner.metadata().collection_kind,
3233 &yield_ident,
3234 &out_location,
3235 );
3236 }
3237 BuildersOrCallback::Callback(_, node_callback) => {
3238 node_callback(node, next_stmt_id);
3239 }
3240 }
3241
3242 let _ = next_stmt_id.get_and_increment();
3243
3244 ident_stack.push(yield_ident);
3245 }
3246
3247 HydroNode::BeginAtomic { inner, metadata } => {
3248 let inner_ident = ident_stack.pop().unwrap();
3249
3250 let begin_ident =
3251 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3252
3253 match builders_or_callback {
3254 BuildersOrCallback::Builders(graph_builders) => {
3255 graph_builders.begin_atomic(
3256 inner_ident,
3257 &inner.metadata().location_id,
3258 &inner.metadata().collection_kind,
3259 &begin_ident,
3260 &out_location,
3261 &metadata.op,
3262 );
3263 }
3264 BuildersOrCallback::Callback(_, node_callback) => {
3265 node_callback(node, next_stmt_id);
3266 }
3267 }
3268
3269 let _ = next_stmt_id.get_and_increment();
3270
3271 ident_stack.push(begin_ident);
3272 }
3273
3274 HydroNode::EndAtomic { inner, .. } => {
3275 let inner_ident = ident_stack.pop().unwrap();
3276
3277 let end_ident =
3278 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3279
3280 match builders_or_callback {
3281 BuildersOrCallback::Builders(graph_builders) => {
3282 graph_builders.end_atomic(
3283 inner_ident,
3284 &inner.metadata().location_id,
3285 &inner.metadata().collection_kind,
3286 &end_ident,
3287 );
3288 }
3289 BuildersOrCallback::Callback(_, node_callback) => {
3290 node_callback(node, next_stmt_id);
3291 }
3292 }
3293
3294 let _ = next_stmt_id.get_and_increment();
3295
3296 ident_stack.push(end_ident);
3297 }
3298
3299 HydroNode::Source {
3300 source, metadata, ..
3301 } => {
3302 if let HydroSource::ExternalNetwork() = source {
3303 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3304 } else {
3305 let source_ident =
3306 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3307
3308 let source_stmt = match source {
3309 HydroSource::Stream(expr) => {
3310 debug_assert!(metadata.location_id.is_top_level());
3311 parse_quote! {
3312 #source_ident = source_stream(#expr);
3313 }
3314 }
3315
3316 HydroSource::ExternalNetwork() => {
3317 unreachable!()
3318 }
3319
3320 HydroSource::Iter(expr) => {
3321 if metadata.location_id.is_top_level() {
3322 parse_quote! {
3323 #source_ident = source_iter(#expr);
3324 }
3325 } else {
3326 parse_quote! {
3328 #source_ident = source_iter(#expr) -> persist::<'static>();
3329 }
3330 }
3331 }
3332
3333 HydroSource::Spin() => {
3334 debug_assert!(metadata.location_id.is_top_level());
3335 parse_quote! {
3336 #source_ident = spin();
3337 }
3338 }
3339
3340 HydroSource::ClusterMembers(target_loc, state) => {
3341 debug_assert!(metadata.location_id.is_top_level());
3342
3343 let members_tee_ident = syn::Ident::new(
3344 &format!(
3345 "__cluster_members_tee_{}_{}",
3346 metadata.location_id.root().key(),
3347 target_loc.key(),
3348 ),
3349 Span::call_site(),
3350 );
3351
3352 match state {
3353 ClusterMembersState::Stream(d) => {
3354 parse_quote! {
3355 #members_tee_ident = source_stream(#d) -> tee();
3356 #source_ident = #members_tee_ident;
3357 }
3358 },
3359 ClusterMembersState::Uninit => syn::parse_quote! {
3360 #source_ident = source_stream(DUMMY);
3361 },
3362 ClusterMembersState::Tee(..) => parse_quote! {
3363 #source_ident = #members_tee_ident;
3364 },
3365 }
3366 }
3367
3368 HydroSource::Embedded(ident) => {
3369 parse_quote! {
3370 #source_ident = source_stream(#ident);
3371 }
3372 }
3373
3374 HydroSource::EmbeddedSingleton(ident) => {
3375 parse_quote! {
3376 #source_ident = source_iter([#ident]);
3377 }
3378 }
3379 };
3380
3381 match builders_or_callback {
3382 BuildersOrCallback::Builders(graph_builders) => {
3383 let builder = graph_builders.get_dfir_mut(&out_location);
3384 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
3385 }
3386 BuildersOrCallback::Callback(_, node_callback) => {
3387 node_callback(node, next_stmt_id);
3388 }
3389 }
3390
3391 let _ = next_stmt_id.get_and_increment();
3392
3393 ident_stack.push(source_ident);
3394 }
3395 }
3396
3397 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3398 let source_ident =
3399 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3400
3401 match builders_or_callback {
3402 BuildersOrCallback::Builders(graph_builders) => {
3403 let builder = graph_builders.get_dfir_mut(&out_location);
3404
3405 if *first_tick_only {
3406 assert!(
3407 !metadata.location_id.is_top_level(),
3408 "first_tick_only SingletonSource must be inside a tick"
3409 );
3410 }
3411
3412 if *first_tick_only
3413 || (metadata.location_id.is_top_level()
3414 && metadata.collection_kind.is_bounded())
3415 {
3416 builder.add_dfir(
3417 parse_quote! {
3418 #source_ident = source_iter([#value]);
3419 },
3420 None,
3421 Some(&next_stmt_id.to_string()),
3422 );
3423 } else {
3424 builder.add_dfir(
3425 parse_quote! {
3426 #source_ident = source_iter([#value]) -> persist::<'static>();
3427 },
3428 None,
3429 Some(&next_stmt_id.to_string()),
3430 );
3431 }
3432 }
3433 BuildersOrCallback::Callback(_, node_callback) => {
3434 node_callback(node, next_stmt_id);
3435 }
3436 }
3437
3438 let _ = next_stmt_id.get_and_increment();
3439
3440 ident_stack.push(source_ident);
3441 }
3442
3443 HydroNode::CycleSource { cycle_id, .. } => {
3444 let ident = cycle_id.as_ident();
3445
3446 match builders_or_callback {
3447 BuildersOrCallback::Builders(_) => {}
3448 BuildersOrCallback::Callback(_, node_callback) => {
3449 node_callback(node, next_stmt_id);
3450 }
3451 }
3452
3453 let _ = next_stmt_id.get_and_increment();
3455
3456 ident_stack.push(ident);
3457 }
3458
3459 HydroNode::Tee { inner, .. } => {
3460 let ret_ident = if let Some(built_idents) =
3461 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3462 {
3463 match builders_or_callback {
3464 BuildersOrCallback::Builders(_) => {}
3465 BuildersOrCallback::Callback(_, node_callback) => {
3466 node_callback(node, next_stmt_id);
3467 }
3468 }
3469
3470 built_idents[0].clone()
3471 } else {
3472 let inner_ident = ident_stack.pop().unwrap();
3475
3476 let tee_ident =
3477 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3478
3479 built_tees.insert(
3480 inner.0.as_ref() as *const RefCell<HydroNode>,
3481 vec![tee_ident.clone()],
3482 );
3483
3484 match builders_or_callback {
3485 BuildersOrCallback::Builders(graph_builders) => {
3486 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3498 fold_hooked_idents.insert(tee_ident.to_string());
3499 }
3500 let builder = graph_builders.get_dfir_mut(&out_location);
3501 builder.add_dfir(
3502 parse_quote! {
3503 #tee_ident = #inner_ident -> tee();
3504 },
3505 None,
3506 Some(&next_stmt_id.to_string()),
3507 );
3508 }
3509 BuildersOrCallback::Callback(_, node_callback) => {
3510 node_callback(node, next_stmt_id);
3511 }
3512 }
3513
3514 tee_ident
3515 };
3516
3517 let _ = next_stmt_id.get_and_increment();
3521 ident_stack.push(ret_ident);
3522 }
3523
3524 HydroNode::Reference { inner, kind, .. } => {
3525 let ret_ident = if let Some(built_idents) =
3526 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3527 {
3528 built_idents[0].clone()
3529 } else {
3530 let inner_ident = ident_stack.pop().unwrap();
3531
3532 let ref_ident =
3533 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3534
3535 built_tees.insert(
3536 inner.0.as_ref() as *const RefCell<HydroNode>,
3537 vec![ref_ident.clone()],
3538 );
3539
3540 match builders_or_callback {
3541 BuildersOrCallback::Builders(graph_builders) => {
3542 let builder = graph_builders.get_dfir_mut(&out_location);
3543 let op_ident = syn::Ident::new(
3544 match kind {
3545 crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3546 crate::handoff_ref::HandoffRefKind::Optional => "optional",
3547 crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3548 },
3549 Span::call_site(),
3550 );
3551 builder.add_dfir(
3552 parse_quote! {
3553 #ref_ident = #inner_ident -> #op_ident();
3554 },
3555 None,
3556 Some(&next_stmt_id.to_string()),
3557 );
3558 }
3559 BuildersOrCallback::Callback(_, node_callback) => {
3560 node_callback(node, next_stmt_id);
3561 }
3562 }
3563
3564 ref_ident
3565 };
3566
3567 let _ = next_stmt_id.get_and_increment();
3570 ident_stack.push(ret_ident);
3571 }
3572
3573 HydroNode::Partition {
3574 inner, f, is_true, ..
3575 } => {
3576 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3578 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3579 match builders_or_callback {
3580 BuildersOrCallback::Builders(_) => {}
3581 BuildersOrCallback::Callback(_, node_callback) => {
3582 node_callback(node, next_stmt_id);
3583 }
3584 }
3585
3586 let idx = if is_true { 0 } else { 1 };
3587 built_idents[idx].clone()
3588 } else {
3589 let inner_ident = ident_stack.pop().unwrap();
3592 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
3593
3594 let partition_ident = syn::Ident::new(
3595 &format!("stream_{}_partition", *next_stmt_id),
3596 Span::call_site(),
3597 );
3598 let true_ident = syn::Ident::new(
3599 &format!("stream_{}_true", *next_stmt_id),
3600 Span::call_site(),
3601 );
3602 let false_ident = syn::Ident::new(
3603 &format!("stream_{}_false", *next_stmt_id),
3604 Span::call_site(),
3605 );
3606
3607 built_tees.insert(
3608 ptr,
3609 vec![true_ident.clone(), false_ident.clone()],
3610 );
3611
3612 match builders_or_callback {
3613 BuildersOrCallback::Builders(graph_builders) => {
3614 let builder = graph_builders.get_dfir_mut(&out_location);
3615 builder.add_dfir(
3616 parse_quote! {
3617 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3618 #true_ident = #partition_ident[0];
3619 #false_ident = #partition_ident[1];
3620 },
3621 None,
3622 Some(&next_stmt_id.to_string()),
3623 );
3624 }
3625 BuildersOrCallback::Callback(_, node_callback) => {
3626 node_callback(node, next_stmt_id);
3627 }
3628 }
3629
3630 if is_true { true_ident } else { false_ident }
3631 };
3632
3633 let _ = next_stmt_id.get_and_increment();
3634 ident_stack.push(ret_ident);
3635 }
3636
3637 HydroNode::Chain { .. } => {
3638 let second_ident = ident_stack.pop().unwrap();
3640 let first_ident = ident_stack.pop().unwrap();
3641
3642 let chain_ident =
3643 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3644
3645 match builders_or_callback {
3646 BuildersOrCallback::Builders(graph_builders) => {
3647 let builder = graph_builders.get_dfir_mut(&out_location);
3648 builder.add_dfir(
3649 parse_quote! {
3650 #chain_ident = chain();
3651 #first_ident -> [0]#chain_ident;
3652 #second_ident -> [1]#chain_ident;
3653 },
3654 None,
3655 Some(&next_stmt_id.to_string()),
3656 );
3657 }
3658 BuildersOrCallback::Callback(_, node_callback) => {
3659 node_callback(node, next_stmt_id);
3660 }
3661 }
3662
3663 let _ = next_stmt_id.get_and_increment();
3664
3665 ident_stack.push(chain_ident);
3666 }
3667
3668 HydroNode::MergeOrdered { first, metadata, .. } => {
3669 let second_ident = ident_stack.pop().unwrap();
3670 let first_ident = ident_stack.pop().unwrap();
3671
3672 let merge_ident =
3673 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3674
3675 match builders_or_callback {
3676 BuildersOrCallback::Builders(graph_builders) => {
3677 graph_builders.merge_ordered(
3678 &first.metadata().location_id,
3679 first_ident,
3680 second_ident,
3681 &merge_ident,
3682 &first.metadata().collection_kind,
3683 &metadata.op,
3684 Some(&next_stmt_id.to_string()),
3685 );
3686 }
3687 BuildersOrCallback::Callback(_, node_callback) => {
3688 node_callback(node, next_stmt_id);
3689 }
3690 }
3691
3692 let _ = next_stmt_id.get_and_increment();
3693
3694 ident_stack.push(merge_ident);
3695 }
3696
3697 HydroNode::ChainFirst { .. } => {
3698 let second_ident = ident_stack.pop().unwrap();
3699 let first_ident = ident_stack.pop().unwrap();
3700
3701 let chain_ident =
3702 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3703
3704 match builders_or_callback {
3705 BuildersOrCallback::Builders(graph_builders) => {
3706 let builder = graph_builders.get_dfir_mut(&out_location);
3707 builder.add_dfir(
3708 parse_quote! {
3709 #chain_ident = chain_first_n(1);
3710 #first_ident -> [0]#chain_ident;
3711 #second_ident -> [1]#chain_ident;
3712 },
3713 None,
3714 Some(&next_stmt_id.to_string()),
3715 );
3716 }
3717 BuildersOrCallback::Callback(_, node_callback) => {
3718 node_callback(node, next_stmt_id);
3719 }
3720 }
3721
3722 let _ = next_stmt_id.get_and_increment();
3723
3724 ident_stack.push(chain_ident);
3725 }
3726
3727 HydroNode::CrossSingleton { right, .. } => {
3728 let right_ident = ident_stack.pop().unwrap();
3729 let left_ident = ident_stack.pop().unwrap();
3730
3731 let cross_ident =
3732 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3733
3734 match builders_or_callback {
3735 BuildersOrCallback::Builders(graph_builders) => {
3736 let builder = graph_builders.get_dfir_mut(&out_location);
3737
3738 if right.metadata().location_id.is_top_level()
3739 && right.metadata().collection_kind.is_bounded()
3740 {
3741 builder.add_dfir(
3742 parse_quote! {
3743 #cross_ident = cross_singleton::<'static>();
3744 #left_ident -> [input]#cross_ident;
3745 #right_ident -> [single]#cross_ident;
3746 },
3747 None,
3748 Some(&next_stmt_id.to_string()),
3749 );
3750 } else {
3751 builder.add_dfir(
3752 parse_quote! {
3753 #cross_ident = cross_singleton();
3754 #left_ident -> [input]#cross_ident;
3755 #right_ident -> [single]#cross_ident;
3756 },
3757 None,
3758 Some(&next_stmt_id.to_string()),
3759 );
3760 }
3761 }
3762 BuildersOrCallback::Callback(_, node_callback) => {
3763 node_callback(node, next_stmt_id);
3764 }
3765 }
3766
3767 let _ = next_stmt_id.get_and_increment();
3768
3769 ident_stack.push(cross_ident);
3770 }
3771
3772 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3773 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3774 parse_quote!(cross_join_multiset)
3775 } else {
3776 parse_quote!(join_multiset)
3777 };
3778
3779 let (HydroNode::CrossProduct { left, right, .. }
3780 | HydroNode::Join { left, right, .. }) = node
3781 else {
3782 unreachable!()
3783 };
3784
3785 let is_top_level = left.metadata().location_id.is_top_level()
3786 && right.metadata().location_id.is_top_level();
3787 let left_lifetime = if left.metadata().location_id.is_top_level() {
3788 quote!('static)
3789 } else {
3790 quote!('tick)
3791 };
3792
3793 let right_lifetime = if right.metadata().location_id.is_top_level() {
3794 quote!('static)
3795 } else {
3796 quote!('tick)
3797 };
3798
3799 let right_ident = ident_stack.pop().unwrap();
3800 let left_ident = ident_stack.pop().unwrap();
3801
3802 let stream_ident =
3803 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3804
3805 match builders_or_callback {
3806 BuildersOrCallback::Builders(graph_builders) => {
3807 let builder = graph_builders.get_dfir_mut(&out_location);
3808 builder.add_dfir(
3809 if is_top_level {
3810 parse_quote! {
3813 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3814 #left_ident -> [0]#stream_ident;
3815 #right_ident -> [1]#stream_ident;
3816 }
3817 } else {
3818 parse_quote! {
3819 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3820 #left_ident -> [0]#stream_ident;
3821 #right_ident -> [1]#stream_ident;
3822 }
3823 }
3824 ,
3825 None,
3826 Some(&next_stmt_id.to_string()),
3827 );
3828 }
3829 BuildersOrCallback::Callback(_, node_callback) => {
3830 node_callback(node, next_stmt_id);
3831 }
3832 }
3833
3834 let _ = next_stmt_id.get_and_increment();
3835
3836 ident_stack.push(stream_ident);
3837 }
3838
3839 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3840 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3841 parse_quote!(difference)
3842 } else {
3843 parse_quote!(anti_join)
3844 };
3845
3846 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3847 node
3848 else {
3849 unreachable!()
3850 };
3851
3852 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3853 quote!('static)
3854 } else {
3855 quote!('tick)
3856 };
3857
3858 let neg_ident = ident_stack.pop().unwrap();
3859 let pos_ident = ident_stack.pop().unwrap();
3860
3861 let stream_ident =
3862 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3863
3864 match builders_or_callback {
3865 BuildersOrCallback::Builders(graph_builders) => {
3866 let builder = graph_builders.get_dfir_mut(&out_location);
3867 builder.add_dfir(
3868 parse_quote! {
3869 #stream_ident = #operator::<'tick, #neg_lifetime>();
3870 #pos_ident -> [pos]#stream_ident;
3871 #neg_ident -> [neg]#stream_ident;
3872 },
3873 None,
3874 Some(&next_stmt_id.to_string()),
3875 );
3876 }
3877 BuildersOrCallback::Callback(_, node_callback) => {
3878 node_callback(node, next_stmt_id);
3879 }
3880 }
3881
3882 let _ = next_stmt_id.get_and_increment();
3883
3884 ident_stack.push(stream_ident);
3885 }
3886
3887 HydroNode::JoinHalf { .. } => {
3888 let HydroNode::JoinHalf { right, .. } = node else {
3889 unreachable!()
3890 };
3891
3892 assert!(
3893 right.metadata().collection_kind.is_bounded(),
3894 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
3895 right.metadata().collection_kind
3896 );
3897
3898 let build_lifetime = if right.metadata().location_id.is_top_level() {
3899 quote!('static)
3900 } else {
3901 quote!('tick)
3902 };
3903
3904 let build_ident = ident_stack.pop().unwrap();
3905 let probe_ident = ident_stack.pop().unwrap();
3906
3907 let stream_ident =
3908 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3909
3910 match builders_or_callback {
3911 BuildersOrCallback::Builders(graph_builders) => {
3912 let builder = graph_builders.get_dfir_mut(&out_location);
3913 builder.add_dfir(
3914 parse_quote! {
3915 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
3916 #probe_ident -> [probe]#stream_ident;
3917 #build_ident -> [build]#stream_ident;
3918 },
3919 None,
3920 Some(&next_stmt_id.to_string()),
3921 );
3922 }
3923 BuildersOrCallback::Callback(_, node_callback) => {
3924 node_callback(node, next_stmt_id);
3925 }
3926 }
3927
3928 let _ = next_stmt_id.get_and_increment();
3929
3930 ident_stack.push(stream_ident);
3931 }
3932
3933 HydroNode::ResolveFutures { .. } => {
3934 let input_ident = ident_stack.pop().unwrap();
3935
3936 let futures_ident =
3937 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3938
3939 match builders_or_callback {
3940 BuildersOrCallback::Builders(graph_builders) => {
3941 let builder = graph_builders.get_dfir_mut(&out_location);
3942 builder.add_dfir(
3943 parse_quote! {
3944 #futures_ident = #input_ident -> resolve_futures();
3945 },
3946 None,
3947 Some(&next_stmt_id.to_string()),
3948 );
3949 }
3950 BuildersOrCallback::Callback(_, node_callback) => {
3951 node_callback(node, next_stmt_id);
3952 }
3953 }
3954
3955 let _ = next_stmt_id.get_and_increment();
3956
3957 ident_stack.push(futures_ident);
3958 }
3959
3960 HydroNode::ResolveFuturesBlocking { .. } => {
3961 let input_ident = ident_stack.pop().unwrap();
3962
3963 let futures_ident =
3964 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3965
3966 match builders_or_callback {
3967 BuildersOrCallback::Builders(graph_builders) => {
3968 let builder = graph_builders.get_dfir_mut(&out_location);
3969 builder.add_dfir(
3970 parse_quote! {
3971 #futures_ident = #input_ident -> resolve_futures_blocking();
3972 },
3973 None,
3974 Some(&next_stmt_id.to_string()),
3975 );
3976 }
3977 BuildersOrCallback::Callback(_, node_callback) => {
3978 node_callback(node, next_stmt_id);
3979 }
3980 }
3981
3982 let _ = next_stmt_id.get_and_increment();
3983
3984 ident_stack.push(futures_ident);
3985 }
3986
3987 HydroNode::ResolveFuturesOrdered { .. } => {
3988 let input_ident = ident_stack.pop().unwrap();
3989
3990 let futures_ident =
3991 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3992
3993 match builders_or_callback {
3994 BuildersOrCallback::Builders(graph_builders) => {
3995 let builder = graph_builders.get_dfir_mut(&out_location);
3996 builder.add_dfir(
3997 parse_quote! {
3998 #futures_ident = #input_ident -> resolve_futures_ordered();
3999 },
4000 None,
4001 Some(&next_stmt_id.to_string()),
4002 );
4003 }
4004 BuildersOrCallback::Callback(_, node_callback) => {
4005 node_callback(node, next_stmt_id);
4006 }
4007 }
4008
4009 let _ = next_stmt_id.get_and_increment();
4010
4011 ident_stack.push(futures_ident);
4012 }
4013
4014 HydroNode::Map { f, .. } => {
4015 let input_ident = ident_stack.pop().unwrap();
4017 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4018
4019 let map_ident =
4020 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4021
4022 match builders_or_callback {
4023 BuildersOrCallback::Builders(graph_builders) => {
4024 let builder = graph_builders.get_dfir_mut(&out_location);
4025 builder.add_dfir(
4026 parse_quote! {
4027 #map_ident = #input_ident -> map(#f_tokens);
4028 },
4029 None,
4030 Some(&next_stmt_id.to_string()),
4031 );
4032 }
4033 BuildersOrCallback::Callback(_, node_callback) => {
4034 node_callback(node, next_stmt_id);
4035 }
4036 }
4037
4038 let _ = next_stmt_id.get_and_increment();
4039
4040 ident_stack.push(map_ident);
4041 }
4042
4043 HydroNode::FlatMap { f, .. } => {
4044 let input_ident = ident_stack.pop().unwrap();
4045 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4046
4047 let flat_map_ident =
4048 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4049
4050 match builders_or_callback {
4051 BuildersOrCallback::Builders(graph_builders) => {
4052 let builder = graph_builders.get_dfir_mut(&out_location);
4053 builder.add_dfir(
4054 parse_quote! {
4055 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4056 },
4057 None,
4058 Some(&next_stmt_id.to_string()),
4059 );
4060 }
4061 BuildersOrCallback::Callback(_, node_callback) => {
4062 node_callback(node, next_stmt_id);
4063 }
4064 }
4065
4066 let _ = next_stmt_id.get_and_increment();
4067
4068 ident_stack.push(flat_map_ident);
4069 }
4070
4071 HydroNode::FlatMapStreamBlocking { f, .. } => {
4072 let input_ident = ident_stack.pop().unwrap();
4073 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4074
4075 let flat_map_stream_blocking_ident =
4076 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4077
4078 match builders_or_callback {
4079 BuildersOrCallback::Builders(graph_builders) => {
4080 let builder = graph_builders.get_dfir_mut(&out_location);
4081 builder.add_dfir(
4082 parse_quote! {
4083 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4084 },
4085 None,
4086 Some(&next_stmt_id.to_string()),
4087 );
4088 }
4089 BuildersOrCallback::Callback(_, node_callback) => {
4090 node_callback(node, next_stmt_id);
4091 }
4092 }
4093
4094 let _ = next_stmt_id.get_and_increment();
4095
4096 ident_stack.push(flat_map_stream_blocking_ident);
4097 }
4098
4099 HydroNode::Filter { f, .. } => {
4100 let input_ident = ident_stack.pop().unwrap();
4101 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4102
4103 let filter_ident =
4104 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4105
4106 match builders_or_callback {
4107 BuildersOrCallback::Builders(graph_builders) => {
4108 let builder = graph_builders.get_dfir_mut(&out_location);
4109 builder.add_dfir(
4110 parse_quote! {
4111 #filter_ident = #input_ident -> filter(#f_tokens);
4112 },
4113 None,
4114 Some(&next_stmt_id.to_string()),
4115 );
4116 }
4117 BuildersOrCallback::Callback(_, node_callback) => {
4118 node_callback(node, next_stmt_id);
4119 }
4120 }
4121
4122 let _ = next_stmt_id.get_and_increment();
4123
4124 ident_stack.push(filter_ident);
4125 }
4126
4127 HydroNode::FilterMap { f, .. } => {
4128 let input_ident = ident_stack.pop().unwrap();
4129 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4130
4131 let filter_map_ident =
4132 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4133
4134 match builders_or_callback {
4135 BuildersOrCallback::Builders(graph_builders) => {
4136 let builder = graph_builders.get_dfir_mut(&out_location);
4137 builder.add_dfir(
4138 parse_quote! {
4139 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4140 },
4141 None,
4142 Some(&next_stmt_id.to_string()),
4143 );
4144 }
4145 BuildersOrCallback::Callback(_, node_callback) => {
4146 node_callback(node, next_stmt_id);
4147 }
4148 }
4149
4150 let _ = next_stmt_id.get_and_increment();
4151
4152 ident_stack.push(filter_map_ident);
4153 }
4154
4155 HydroNode::Sort { .. } => {
4156 let input_ident = ident_stack.pop().unwrap();
4157
4158 let sort_ident =
4159 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4160
4161 match builders_or_callback {
4162 BuildersOrCallback::Builders(graph_builders) => {
4163 let builder = graph_builders.get_dfir_mut(&out_location);
4164 builder.add_dfir(
4165 parse_quote! {
4166 #sort_ident = #input_ident -> sort();
4167 },
4168 None,
4169 Some(&next_stmt_id.to_string()),
4170 );
4171 }
4172 BuildersOrCallback::Callback(_, node_callback) => {
4173 node_callback(node, next_stmt_id);
4174 }
4175 }
4176
4177 let _ = next_stmt_id.get_and_increment();
4178
4179 ident_stack.push(sort_ident);
4180 }
4181
4182 HydroNode::DeferTick { .. } => {
4183 let input_ident = ident_stack.pop().unwrap();
4184
4185 let defer_tick_ident =
4186 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4187
4188 match builders_or_callback {
4189 BuildersOrCallback::Builders(graph_builders) => {
4190 let builder = graph_builders.get_dfir_mut(&out_location);
4191 builder.add_dfir(
4192 parse_quote! {
4193 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4194 },
4195 None,
4196 Some(&next_stmt_id.to_string()),
4197 );
4198 }
4199 BuildersOrCallback::Callback(_, node_callback) => {
4200 node_callback(node, next_stmt_id);
4201 }
4202 }
4203
4204 let _ = next_stmt_id.get_and_increment();
4205
4206 ident_stack.push(defer_tick_ident);
4207 }
4208
4209 HydroNode::Enumerate { input, .. } => {
4210 let input_ident = ident_stack.pop().unwrap();
4211
4212 let enumerate_ident =
4213 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4214
4215 match builders_or_callback {
4216 BuildersOrCallback::Builders(graph_builders) => {
4217 let builder = graph_builders.get_dfir_mut(&out_location);
4218 let lifetime = if input.metadata().location_id.is_top_level() {
4219 quote!('static)
4220 } else {
4221 quote!('tick)
4222 };
4223 builder.add_dfir(
4224 parse_quote! {
4225 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4226 },
4227 None,
4228 Some(&next_stmt_id.to_string()),
4229 );
4230 }
4231 BuildersOrCallback::Callback(_, node_callback) => {
4232 node_callback(node, next_stmt_id);
4233 }
4234 }
4235
4236 let _ = next_stmt_id.get_and_increment();
4237
4238 ident_stack.push(enumerate_ident);
4239 }
4240
4241 HydroNode::Inspect { f, .. } => {
4242 let input_ident = ident_stack.pop().unwrap();
4243 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4244
4245 let inspect_ident =
4246 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4247
4248 match builders_or_callback {
4249 BuildersOrCallback::Builders(graph_builders) => {
4250 let builder = graph_builders.get_dfir_mut(&out_location);
4251 builder.add_dfir(
4252 parse_quote! {
4253 #inspect_ident = #input_ident -> inspect(#f_tokens);
4254 },
4255 None,
4256 Some(&next_stmt_id.to_string()),
4257 );
4258 }
4259 BuildersOrCallback::Callback(_, node_callback) => {
4260 node_callback(node, next_stmt_id);
4261 }
4262 }
4263
4264 let _ = next_stmt_id.get_and_increment();
4265
4266 ident_stack.push(inspect_ident);
4267 }
4268
4269 HydroNode::Unique { input, .. } => {
4270 let input_ident = ident_stack.pop().unwrap();
4271
4272 let unique_ident =
4273 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4274
4275 match builders_or_callback {
4276 BuildersOrCallback::Builders(graph_builders) => {
4277 let builder = graph_builders.get_dfir_mut(&out_location);
4278 let lifetime = if input.metadata().location_id.is_top_level() {
4279 quote!('static)
4280 } else {
4281 quote!('tick)
4282 };
4283
4284 builder.add_dfir(
4285 parse_quote! {
4286 #unique_ident = #input_ident -> unique::<#lifetime>();
4287 },
4288 None,
4289 Some(&next_stmt_id.to_string()),
4290 );
4291 }
4292 BuildersOrCallback::Callback(_, node_callback) => {
4293 node_callback(node, next_stmt_id);
4294 }
4295 }
4296
4297 let _ = next_stmt_id.get_and_increment();
4298
4299 ident_stack.push(unique_ident);
4300 }
4301
4302 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4303 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4304 if input.metadata().location_id.is_top_level()
4305 && input.metadata().collection_kind.is_bounded()
4306 {
4307 parse_quote!(fold_no_replay)
4308 } else {
4309 parse_quote!(fold)
4310 }
4311 } else if matches!(node, HydroNode::Scan { .. }) {
4312 parse_quote!(scan)
4313 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4314 parse_quote!(scan_async_blocking)
4315 } else if let HydroNode::FoldKeyed { input, .. } = node {
4316 if input.metadata().location_id.is_top_level()
4317 && input.metadata().collection_kind.is_bounded()
4318 {
4319 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4320 } else {
4321 parse_quote!(fold_keyed)
4322 }
4323 } else {
4324 unreachable!()
4325 };
4326
4327 let (HydroNode::Fold { input, .. }
4328 | HydroNode::FoldKeyed { input, .. }
4329 | HydroNode::Scan { input, .. }
4330 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4331 else {
4332 unreachable!()
4333 };
4334
4335 let lifetime = if input.metadata().location_id.is_top_level() {
4336 quote!('static)
4337 } else {
4338 quote!('tick)
4339 };
4340
4341 let input_ident = ident_stack.pop().unwrap();
4342
4343 let (HydroNode::Fold { init, acc, .. }
4344 | HydroNode::FoldKeyed { init, acc, .. }
4345 | HydroNode::Scan { init, acc, .. }
4346 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4347 else {
4348 unreachable!()
4349 };
4350
4351 let acc_tokens = acc.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4352 let init_tokens = init.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4353
4354 let fold_ident =
4355 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4356
4357 match builders_or_callback {
4358 BuildersOrCallback::Builders(graph_builders) => {
4359 if matches!(node, HydroNode::Fold { .. })
4360 && node.metadata().location_id.is_top_level()
4361 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4362 && graph_builders.singleton_intermediates()
4363 && !node.metadata().collection_kind.is_bounded()
4364 {
4365 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4366 let hooked_input_ident = graph_builders.emit_fold_hook(
4367 &input.metadata().location_id,
4368 &input_ident,
4369 &input.metadata().collection_kind,
4370 &node.metadata().op,
4371 );
4372
4373 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4374 let acc: syn::Expr = parse_quote!({
4375 let mut __inner = #acc_tokens;
4376 move |__state, __batch: Vec<_>| {
4377 if __batch.is_empty() {
4378 return None;
4379 }
4380 for __value in __batch {
4381 __inner(__state, __value);
4382 }
4383 Some(__state.clone())
4384 }
4385 });
4386 (hooked, acc)
4387 } else {
4388 let acc: syn::Expr = parse_quote!({
4389 let mut __inner = #acc_tokens;
4390 move |__state, __value| {
4391 __inner(__state, __value);
4392 Some(__state.clone())
4393 }
4394 });
4395 (&input_ident, acc)
4396 };
4397
4398 let builder = graph_builders.get_dfir_mut(&out_location);
4399 builder.add_dfir(
4400 parse_quote! {
4401 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4402 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4403 #fold_ident = chain();
4404 },
4405 None,
4406 Some(&next_stmt_id.to_string()),
4407 );
4408
4409 if hooked_input_ident.is_some() {
4410 fold_hooked_idents.insert(fold_ident.to_string());
4411 }
4412 } else if matches!(node, HydroNode::FoldKeyed { .. })
4413 && node.metadata().location_id.is_top_level()
4414 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4415 && graph_builders.singleton_intermediates()
4416 && !node.metadata().collection_kind.is_bounded()
4417 {
4418 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4419 let hooked_input_ident = graph_builders.emit_fold_hook(
4420 &input.metadata().location_id,
4421 &input_ident,
4422 &input.metadata().collection_kind,
4423 &node.metadata().op,
4424 );
4425 let builder = graph_builders.get_dfir_mut(&out_location);
4426
4427 let wrapped_acc: syn::Expr = parse_quote!({
4428 let mut __init = #init_tokens;
4429 let mut __inner = #acc_tokens;
4430 move |__state, __kv: (_, _)| {
4431 let __state = __state
4433 .entry(::std::clone::Clone::clone(&__kv.0))
4434 .or_insert_with(|| (__init)());
4435 __inner(__state, __kv.1);
4436 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4437 }
4438 });
4439
4440 if let Some(hooked_input_ident) = hooked_input_ident {
4441 builder.add_dfir(
4442 parse_quote! {
4443 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4444 },
4445 None,
4446 Some(&next_stmt_id.to_string()),
4447 );
4448
4449 fold_hooked_idents.insert(fold_ident.to_string());
4450 } else {
4451 builder.add_dfir(
4452 parse_quote! {
4453 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4454 },
4455 None,
4456 Some(&next_stmt_id.to_string()),
4457 );
4458 }
4459 } else if (matches!(node, HydroNode::Fold { .. })
4460 || matches!(node, HydroNode::FoldKeyed { .. }))
4461 && !node.metadata().location_id.is_top_level()
4462 && graph_builders.singleton_intermediates()
4463 {
4464 let input_ref = match &*node {
4465 HydroNode::Fold { input, .. } => input,
4466 HydroNode::FoldKeyed { input, .. } => input,
4467 _ => unreachable!(),
4468 };
4469 let hooked_input_ident = graph_builders.emit_fold_hook(
4470 &input_ref.metadata().location_id,
4471 &input_ident,
4472 &input_ref.metadata().collection_kind,
4473 &node.metadata().op,
4474 );
4475
4476 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4477 let builder = graph_builders.get_dfir_mut(&out_location);
4478 builder.add_dfir(
4479 parse_quote! {
4480 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4481 },
4482 None,
4483 Some(&next_stmt_id.to_string()),
4484 );
4485 } else {
4486 let builder = graph_builders.get_dfir_mut(&out_location);
4487 builder.add_dfir(
4488 parse_quote! {
4489 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4490 },
4491 None,
4492 Some(&next_stmt_id.to_string()),
4493 );
4494 }
4495 }
4496 BuildersOrCallback::Callback(_, node_callback) => {
4497 node_callback(node, next_stmt_id);
4498 }
4499 }
4500
4501 let _ = next_stmt_id.get_and_increment();
4502
4503 ident_stack.push(fold_ident);
4504 }
4505
4506 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4507 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4508 if input.metadata().location_id.is_top_level()
4509 && input.metadata().collection_kind.is_bounded()
4510 {
4511 parse_quote!(reduce_no_replay)
4512 } else {
4513 parse_quote!(reduce)
4514 }
4515 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4516 if input.metadata().location_id.is_top_level()
4517 && input.metadata().collection_kind.is_bounded()
4518 {
4519 todo!(
4520 "Calling keyed reduce on a top-level bounded collection is not supported"
4521 )
4522 } else {
4523 parse_quote!(reduce_keyed)
4524 }
4525 } else {
4526 unreachable!()
4527 };
4528
4529 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4530 else {
4531 unreachable!()
4532 };
4533
4534 let lifetime = if input.metadata().location_id.is_top_level() {
4535 quote!('static)
4536 } else {
4537 quote!('tick)
4538 };
4539
4540 let input_ident = ident_stack.pop().unwrap();
4541
4542 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4543 else {
4544 unreachable!()
4545 };
4546
4547 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4548
4549 let reduce_ident =
4550 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4551
4552 match builders_or_callback {
4553 BuildersOrCallback::Builders(graph_builders) => {
4554 if matches!(node, HydroNode::Reduce { .. })
4555 && node.metadata().location_id.is_top_level()
4556 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4557 && graph_builders.singleton_intermediates()
4558 && !node.metadata().collection_kind.is_bounded()
4559 {
4560 todo!(
4561 "Reduce with optional intermediates is not yet supported in simulator"
4562 );
4563 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4564 && node.metadata().location_id.is_top_level()
4565 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4566 && graph_builders.singleton_intermediates()
4567 && !node.metadata().collection_kind.is_bounded()
4568 {
4569 todo!(
4570 "Reduce keyed with optional intermediates is not yet supported in simulator"
4571 );
4572 } else {
4573 let builder = graph_builders.get_dfir_mut(&out_location);
4574 builder.add_dfir(
4575 parse_quote! {
4576 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4577 },
4578 None,
4579 Some(&next_stmt_id.to_string()),
4580 );
4581 }
4582 }
4583 BuildersOrCallback::Callback(_, node_callback) => {
4584 node_callback(node, next_stmt_id);
4585 }
4586 }
4587
4588 let _ = next_stmt_id.get_and_increment();
4589
4590 ident_stack.push(reduce_ident);
4591 }
4592
4593 HydroNode::ReduceKeyedWatermark {
4594 f,
4595 input,
4596 metadata,
4597 ..
4598 } => {
4599 let lifetime = if input.metadata().location_id.is_top_level() {
4600 quote!('static)
4601 } else {
4602 quote!('tick)
4603 };
4604
4605 let watermark_ident = ident_stack.pop().unwrap();
4607 let input_ident = ident_stack.pop().unwrap();
4608 let f_tokens = f.emit_tokens(&mut ident_stack, &mut singleton_access_counters);
4609
4610 let chain_ident = syn::Ident::new(
4611 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
4612 Span::call_site(),
4613 );
4614
4615 let fold_ident =
4616 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4617
4618 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4619 && input.metadata().collection_kind.is_bounded()
4620 {
4621 parse_quote!(fold_no_replay)
4622 } else {
4623 parse_quote!(fold)
4624 };
4625
4626 match builders_or_callback {
4627 BuildersOrCallback::Builders(graph_builders) => {
4628 if metadata.location_id.is_top_level()
4629 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4630 && graph_builders.singleton_intermediates()
4631 && !metadata.collection_kind.is_bounded()
4632 {
4633 todo!(
4634 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4635 )
4636 } else {
4637 let builder = graph_builders.get_dfir_mut(&out_location);
4638 builder.add_dfir(
4639 parse_quote! {
4640 #chain_ident = chain();
4641 #input_ident
4642 -> map(|x| (Some(x), None))
4643 -> [0]#chain_ident;
4644 #watermark_ident
4645 -> map(|watermark| (None, Some(watermark)))
4646 -> [1]#chain_ident;
4647
4648 #fold_ident = #chain_ident
4649 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4650 let __reduce_keyed_fn = #f_tokens;
4651 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4652 if let Some((k, v)) = opt_payload {
4653 if let Some(curr_watermark) = *opt_curr_watermark {
4654 if k < curr_watermark {
4655 return;
4656 }
4657 }
4658 match map.entry(k) {
4659 ::std::collections::hash_map::Entry::Vacant(e) => {
4660 e.insert(v);
4661 }
4662 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4663 __reduce_keyed_fn(e.get_mut(), v);
4664 }
4665 }
4666 } else {
4667 let watermark = opt_watermark.unwrap();
4668 if let Some(curr_watermark) = *opt_curr_watermark {
4669 if watermark <= curr_watermark {
4670 return;
4671 }
4672 }
4673 map.retain(|k, _| *k >= watermark);
4674 *opt_curr_watermark = Some(watermark);
4675 }
4676 }
4677 })
4678 -> flat_map(|(map, _curr_watermark)| map);
4679 },
4680 None,
4681 Some(&next_stmt_id.to_string()),
4682 );
4683 }
4684 }
4685 BuildersOrCallback::Callback(_, node_callback) => {
4686 node_callback(node, next_stmt_id);
4687 }
4688 }
4689
4690 let _ = next_stmt_id.get_and_increment();
4691
4692 ident_stack.push(fold_ident);
4693 }
4694
4695 HydroNode::Network {
4696 networking_info,
4697 serialize_fn: serialize_pipeline,
4698 instantiate_fn,
4699 deserialize_fn: deserialize_pipeline,
4700 input,
4701 ..
4702 } => {
4703 let input_ident = ident_stack.pop().unwrap();
4704
4705 let receiver_stream_ident =
4706 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4707
4708 match builders_or_callback {
4709 BuildersOrCallback::Builders(graph_builders) => {
4710 let (sink_expr, source_expr) = match instantiate_fn {
4711 DebugInstantiate::Building => (
4712 syn::parse_quote!(DUMMY_SINK),
4713 syn::parse_quote!(DUMMY_SOURCE),
4714 ),
4715
4716 DebugInstantiate::Finalized(finalized) => {
4717 (finalized.sink.clone(), finalized.source.clone())
4718 }
4719 };
4720
4721 graph_builders.create_network(
4722 &input.metadata().location_id,
4723 &out_location,
4724 input_ident,
4725 &receiver_stream_ident,
4726 serialize_pipeline.as_ref(),
4727 sink_expr,
4728 source_expr,
4729 deserialize_pipeline.as_ref(),
4730 *next_stmt_id,
4731 networking_info,
4732 );
4733 }
4734 BuildersOrCallback::Callback(_, node_callback) => {
4735 node_callback(node, next_stmt_id);
4736 }
4737 }
4738
4739 let _ = next_stmt_id.get_and_increment();
4740
4741 ident_stack.push(receiver_stream_ident);
4742 }
4743
4744 HydroNode::ExternalInput {
4745 instantiate_fn,
4746 deserialize_fn: deserialize_pipeline,
4747 ..
4748 } => {
4749 let receiver_stream_ident =
4750 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4751
4752 match builders_or_callback {
4753 BuildersOrCallback::Builders(graph_builders) => {
4754 let (_, source_expr) = match instantiate_fn {
4755 DebugInstantiate::Building => (
4756 syn::parse_quote!(DUMMY_SINK),
4757 syn::parse_quote!(DUMMY_SOURCE),
4758 ),
4759
4760 DebugInstantiate::Finalized(finalized) => {
4761 (finalized.sink.clone(), finalized.source.clone())
4762 }
4763 };
4764
4765 graph_builders.create_external_source(
4766 &out_location,
4767 source_expr,
4768 &receiver_stream_ident,
4769 deserialize_pipeline.as_ref(),
4770 *next_stmt_id,
4771 );
4772 }
4773 BuildersOrCallback::Callback(_, node_callback) => {
4774 node_callback(node, next_stmt_id);
4775 }
4776 }
4777
4778 let _ = next_stmt_id.get_and_increment();
4779
4780 ident_stack.push(receiver_stream_ident);
4781 }
4782
4783 HydroNode::Counter {
4784 tag,
4785 duration,
4786 prefix,
4787 ..
4788 } => {
4789 let input_ident = ident_stack.pop().unwrap();
4790
4791 let counter_ident =
4792 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
4793
4794 match builders_or_callback {
4795 BuildersOrCallback::Builders(graph_builders) => {
4796 let arg = format!("{}({})", prefix, tag);
4797 let builder = graph_builders.get_dfir_mut(&out_location);
4798 builder.add_dfir(
4799 parse_quote! {
4800 #counter_ident = #input_ident -> _counter(#arg, #duration);
4801 },
4802 None,
4803 Some(&next_stmt_id.to_string()),
4804 );
4805 }
4806 BuildersOrCallback::Callback(_, node_callback) => {
4807 node_callback(node, next_stmt_id);
4808 }
4809 }
4810
4811 let _ = next_stmt_id.get_and_increment();
4812
4813 ident_stack.push(counter_ident);
4814 }
4815 }
4816 },
4817 seen_tees,
4818 false,
4819 );
4820
4821 let ret = ident_stack
4822 .pop()
4823 .expect("ident_stack should have exactly one element after traversal");
4824 assert!(
4825 ident_stack.is_empty(),
4826 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
4827 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
4828 ident_stack.len()
4829 );
4830 ret
4831 }
4832
4833 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4834 match self {
4835 HydroNode::Placeholder => {
4836 panic!()
4837 }
4838 HydroNode::Cast { .. }
4839 | HydroNode::ObserveNonDet { .. }
4840 | HydroNode::UnboundSingleton { .. }
4841 | HydroNode::AssertIsConsistent { .. } => {}
4842 HydroNode::Source { source, .. } => match source {
4843 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4844 HydroSource::ExternalNetwork()
4845 | HydroSource::Spin()
4846 | HydroSource::ClusterMembers(_, _)
4847 | HydroSource::Embedded(_)
4848 | HydroSource::EmbeddedSingleton(_) => {} },
4850 HydroNode::SingletonSource { value, .. } => {
4851 transform(value);
4852 }
4853 HydroNode::CycleSource { .. }
4854 | HydroNode::Tee { .. }
4855 | HydroNode::Reference { .. }
4856 | HydroNode::YieldConcat { .. }
4857 | HydroNode::BeginAtomic { .. }
4858 | HydroNode::EndAtomic { .. }
4859 | HydroNode::Batch { .. }
4860 | HydroNode::Chain { .. }
4861 | HydroNode::MergeOrdered { .. }
4862 | HydroNode::ChainFirst { .. }
4863 | HydroNode::CrossProduct { .. }
4864 | HydroNode::CrossSingleton { .. }
4865 | HydroNode::ResolveFutures { .. }
4866 | HydroNode::ResolveFuturesBlocking { .. }
4867 | HydroNode::ResolveFuturesOrdered { .. }
4868 | HydroNode::Join { .. }
4869 | HydroNode::JoinHalf { .. }
4870 | HydroNode::Difference { .. }
4871 | HydroNode::AntiJoin { .. }
4872 | HydroNode::DeferTick { .. }
4873 | HydroNode::Enumerate { .. }
4874 | HydroNode::Unique { .. }
4875 | HydroNode::Sort { .. } => {}
4876 HydroNode::Map { f, .. }
4877 | HydroNode::FlatMap { f, .. }
4878 | HydroNode::FlatMapStreamBlocking { f, .. }
4879 | HydroNode::Filter { f, .. }
4880 | HydroNode::FilterMap { f, .. }
4881 | HydroNode::Inspect { f, .. }
4882 | HydroNode::Partition { f, .. }
4883 | HydroNode::Reduce { f, .. }
4884 | HydroNode::ReduceKeyed { f, .. }
4885 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4886 transform(&mut f.expr);
4887 }
4888 HydroNode::Fold { init, acc, .. }
4889 | HydroNode::Scan { init, acc, .. }
4890 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4891 | HydroNode::FoldKeyed { init, acc, .. } => {
4892 transform(&mut init.expr);
4893 transform(&mut acc.expr);
4894 }
4895 HydroNode::Network {
4896 serialize_fn,
4897 deserialize_fn,
4898 ..
4899 } => {
4900 if let Some(serialize_fn) = serialize_fn {
4901 transform(serialize_fn);
4902 }
4903 if let Some(deserialize_fn) = deserialize_fn {
4904 transform(deserialize_fn);
4905 }
4906 }
4907 HydroNode::ExternalInput { deserialize_fn, .. } => {
4908 if let Some(deserialize_fn) = deserialize_fn {
4909 transform(deserialize_fn);
4910 }
4911 }
4912 HydroNode::Counter { duration, .. } => {
4913 transform(duration);
4914 }
4915 }
4916 }
4917
4918 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4919 &self.metadata().op
4920 }
4921
4922 pub fn metadata(&self) -> &HydroIrMetadata {
4923 match self {
4924 HydroNode::Placeholder => {
4925 panic!()
4926 }
4927 HydroNode::Cast { metadata, .. }
4928 | HydroNode::ObserveNonDet { metadata, .. }
4929 | HydroNode::AssertIsConsistent { metadata, .. }
4930 | HydroNode::UnboundSingleton { metadata, .. }
4931 | HydroNode::Source { metadata, .. }
4932 | HydroNode::SingletonSource { metadata, .. }
4933 | HydroNode::CycleSource { metadata, .. }
4934 | HydroNode::Tee { metadata, .. }
4935 | HydroNode::Reference { metadata, .. }
4936 | HydroNode::Partition { metadata, .. }
4937 | HydroNode::YieldConcat { metadata, .. }
4938 | HydroNode::BeginAtomic { metadata, .. }
4939 | HydroNode::EndAtomic { metadata, .. }
4940 | HydroNode::Batch { metadata, .. }
4941 | HydroNode::Chain { metadata, .. }
4942 | HydroNode::MergeOrdered { metadata, .. }
4943 | HydroNode::ChainFirst { metadata, .. }
4944 | HydroNode::CrossProduct { metadata, .. }
4945 | HydroNode::CrossSingleton { metadata, .. }
4946 | HydroNode::Join { metadata, .. }
4947 | HydroNode::JoinHalf { metadata, .. }
4948 | HydroNode::Difference { metadata, .. }
4949 | HydroNode::AntiJoin { metadata, .. }
4950 | HydroNode::ResolveFutures { metadata, .. }
4951 | HydroNode::ResolveFuturesBlocking { metadata, .. }
4952 | HydroNode::ResolveFuturesOrdered { metadata, .. }
4953 | HydroNode::Map { metadata, .. }
4954 | HydroNode::FlatMap { metadata, .. }
4955 | HydroNode::FlatMapStreamBlocking { metadata, .. }
4956 | HydroNode::Filter { metadata, .. }
4957 | HydroNode::FilterMap { metadata, .. }
4958 | HydroNode::DeferTick { metadata, .. }
4959 | HydroNode::Enumerate { metadata, .. }
4960 | HydroNode::Inspect { metadata, .. }
4961 | HydroNode::Unique { metadata, .. }
4962 | HydroNode::Sort { metadata, .. }
4963 | HydroNode::Scan { metadata, .. }
4964 | HydroNode::ScanAsyncBlocking { metadata, .. }
4965 | HydroNode::Fold { metadata, .. }
4966 | HydroNode::FoldKeyed { metadata, .. }
4967 | HydroNode::Reduce { metadata, .. }
4968 | HydroNode::ReduceKeyed { metadata, .. }
4969 | HydroNode::ReduceKeyedWatermark { metadata, .. }
4970 | HydroNode::ExternalInput { metadata, .. }
4971 | HydroNode::Network { metadata, .. }
4972 | HydroNode::Counter { metadata, .. } => metadata,
4973 }
4974 }
4975
4976 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4977 &mut self.metadata_mut().op
4978 }
4979
4980 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4981 match self {
4982 HydroNode::Placeholder => {
4983 panic!()
4984 }
4985 HydroNode::Cast { metadata, .. }
4986 | HydroNode::ObserveNonDet { metadata, .. }
4987 | HydroNode::AssertIsConsistent { metadata, .. }
4988 | HydroNode::UnboundSingleton { metadata, .. }
4989 | HydroNode::Source { metadata, .. }
4990 | HydroNode::SingletonSource { metadata, .. }
4991 | HydroNode::CycleSource { metadata, .. }
4992 | HydroNode::Tee { metadata, .. }
4993 | HydroNode::Reference { metadata, .. }
4994 | HydroNode::Partition { metadata, .. }
4995 | HydroNode::YieldConcat { metadata, .. }
4996 | HydroNode::BeginAtomic { metadata, .. }
4997 | HydroNode::EndAtomic { metadata, .. }
4998 | HydroNode::Batch { metadata, .. }
4999 | HydroNode::Chain { metadata, .. }
5000 | HydroNode::MergeOrdered { metadata, .. }
5001 | HydroNode::ChainFirst { metadata, .. }
5002 | HydroNode::CrossProduct { metadata, .. }
5003 | HydroNode::CrossSingleton { metadata, .. }
5004 | HydroNode::Join { metadata, .. }
5005 | HydroNode::JoinHalf { metadata, .. }
5006 | HydroNode::Difference { metadata, .. }
5007 | HydroNode::AntiJoin { metadata, .. }
5008 | HydroNode::ResolveFutures { metadata, .. }
5009 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5010 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5011 | HydroNode::Map { metadata, .. }
5012 | HydroNode::FlatMap { metadata, .. }
5013 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5014 | HydroNode::Filter { metadata, .. }
5015 | HydroNode::FilterMap { metadata, .. }
5016 | HydroNode::DeferTick { metadata, .. }
5017 | HydroNode::Enumerate { metadata, .. }
5018 | HydroNode::Inspect { metadata, .. }
5019 | HydroNode::Unique { metadata, .. }
5020 | HydroNode::Sort { metadata, .. }
5021 | HydroNode::Scan { metadata, .. }
5022 | HydroNode::ScanAsyncBlocking { metadata, .. }
5023 | HydroNode::Fold { metadata, .. }
5024 | HydroNode::FoldKeyed { metadata, .. }
5025 | HydroNode::Reduce { metadata, .. }
5026 | HydroNode::ReduceKeyed { metadata, .. }
5027 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5028 | HydroNode::ExternalInput { metadata, .. }
5029 | HydroNode::Network { metadata, .. }
5030 | HydroNode::Counter { metadata, .. } => metadata,
5031 }
5032 }
5033
5034 pub fn input(&self) -> Vec<&HydroNode> {
5035 match self {
5036 HydroNode::Placeholder => {
5037 panic!()
5038 }
5039 HydroNode::Source { .. }
5040 | HydroNode::SingletonSource { .. }
5041 | HydroNode::ExternalInput { .. }
5042 | HydroNode::CycleSource { .. }
5043 | HydroNode::Tee { .. }
5044 | HydroNode::Reference { .. }
5045 | HydroNode::Partition { .. } => {
5046 vec![]
5048 }
5049 HydroNode::Cast { inner, .. }
5050 | HydroNode::ObserveNonDet { inner, .. }
5051 | HydroNode::YieldConcat { inner, .. }
5052 | HydroNode::BeginAtomic { inner, .. }
5053 | HydroNode::EndAtomic { inner, .. }
5054 | HydroNode::Batch { inner, .. }
5055 | HydroNode::UnboundSingleton { inner, .. }
5056 | HydroNode::AssertIsConsistent { inner, .. } => {
5057 vec![inner]
5058 }
5059 HydroNode::Chain { first, second, .. } => {
5060 vec![first, second]
5061 }
5062 HydroNode::MergeOrdered { first, second, .. } => {
5063 vec![first, second]
5064 }
5065 HydroNode::ChainFirst { first, second, .. } => {
5066 vec![first, second]
5067 }
5068 HydroNode::CrossProduct { left, right, .. }
5069 | HydroNode::CrossSingleton { left, right, .. }
5070 | HydroNode::Join { left, right, .. }
5071 | HydroNode::JoinHalf { left, right, .. } => {
5072 vec![left, right]
5073 }
5074 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5075 vec![pos, neg]
5076 }
5077 HydroNode::Map { input, .. }
5078 | HydroNode::FlatMap { input, .. }
5079 | HydroNode::FlatMapStreamBlocking { input, .. }
5080 | HydroNode::Filter { input, .. }
5081 | HydroNode::FilterMap { input, .. }
5082 | HydroNode::Sort { input, .. }
5083 | HydroNode::DeferTick { input, .. }
5084 | HydroNode::Enumerate { input, .. }
5085 | HydroNode::Inspect { input, .. }
5086 | HydroNode::Unique { input, .. }
5087 | HydroNode::Network { input, .. }
5088 | HydroNode::Counter { input, .. }
5089 | HydroNode::ResolveFutures { input, .. }
5090 | HydroNode::ResolveFuturesBlocking { input, .. }
5091 | HydroNode::ResolveFuturesOrdered { input, .. }
5092 | HydroNode::Fold { input, .. }
5093 | HydroNode::FoldKeyed { input, .. }
5094 | HydroNode::Reduce { input, .. }
5095 | HydroNode::ReduceKeyed { input, .. }
5096 | HydroNode::Scan { input, .. }
5097 | HydroNode::ScanAsyncBlocking { input, .. } => {
5098 vec![input]
5099 }
5100 HydroNode::ReduceKeyedWatermark {
5101 input, watermark, ..
5102 } => {
5103 vec![input, watermark]
5104 }
5105 }
5106 }
5107
5108 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5109 self.input()
5110 .iter()
5111 .map(|input_node| input_node.metadata())
5112 .collect()
5113 }
5114
5115 pub fn is_shared_with_others(&self) -> bool {
5119 match self {
5120 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5121 Rc::strong_count(&inner.0) > 1
5122 }
5123 HydroNode::Reference { .. } => false,
5126 _ => false,
5127 }
5128 }
5129
5130 pub fn print_root(&self) -> String {
5131 match self {
5132 HydroNode::Placeholder => {
5133 panic!()
5134 }
5135 HydroNode::Cast { .. } => "Cast()".to_owned(),
5136 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5137 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5138 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5139 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5140 HydroNode::SingletonSource {
5141 value,
5142 first_tick_only,
5143 ..
5144 } => format!(
5145 "SingletonSource({:?}, first_tick_only={})",
5146 value, first_tick_only
5147 ),
5148 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5149 HydroNode::Tee { inner, .. } => {
5150 format!("Tee({})", inner.0.borrow().print_root())
5151 }
5152 HydroNode::Reference { inner, kind, .. } => {
5153 format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5154 }
5155 HydroNode::Partition { f, is_true, .. } => {
5156 format!("Partition({:?}, is_true={})", f, is_true)
5157 }
5158 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5159 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5160 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5161 HydroNode::Batch { .. } => "Batch()".to_owned(),
5162 HydroNode::Chain { first, second, .. } => {
5163 format!("Chain({}, {})", first.print_root(), second.print_root())
5164 }
5165 HydroNode::MergeOrdered { first, second, .. } => {
5166 format!(
5167 "MergeOrdered({}, {})",
5168 first.print_root(),
5169 second.print_root()
5170 )
5171 }
5172 HydroNode::ChainFirst { first, second, .. } => {
5173 format!(
5174 "ChainFirst({}, {})",
5175 first.print_root(),
5176 second.print_root()
5177 )
5178 }
5179 HydroNode::CrossProduct { left, right, .. } => {
5180 format!(
5181 "CrossProduct({}, {})",
5182 left.print_root(),
5183 right.print_root()
5184 )
5185 }
5186 HydroNode::CrossSingleton { left, right, .. } => {
5187 format!(
5188 "CrossSingleton({}, {})",
5189 left.print_root(),
5190 right.print_root()
5191 )
5192 }
5193 HydroNode::Join { left, right, .. } => {
5194 format!("Join({}, {})", left.print_root(), right.print_root())
5195 }
5196 HydroNode::JoinHalf { left, right, .. } => {
5197 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5198 }
5199 HydroNode::Difference { pos, neg, .. } => {
5200 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5201 }
5202 HydroNode::AntiJoin { pos, neg, .. } => {
5203 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5204 }
5205 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5206 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5207 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5208 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5209 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5210 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5211 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5212 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5213 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5214 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5215 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5216 HydroNode::Unique { .. } => "Unique()".to_owned(),
5217 HydroNode::Sort { .. } => "Sort()".to_owned(),
5218 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5219 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5220 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5221 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5222 }
5223 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5224 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5225 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5226 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5227 HydroNode::Network { .. } => "Network()".to_owned(),
5228 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5229 HydroNode::Counter { tag, duration, .. } => {
5230 format!("Counter({:?}, {:?})", tag, duration)
5231 }
5232 }
5233 }
5234}
5235
5236#[cfg(feature = "build")]
5237fn instantiate_network<'a, D>(
5238 env: &mut D::InstantiateEnv,
5239 from_location: &LocationId,
5240 to_location: &LocationId,
5241 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5242 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5243 name: Option<&str>,
5244 networking_info: &crate::networking::NetworkingInfo,
5245) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5246where
5247 D: Deploy<'a>,
5248{
5249 let ((sink, source), connect_fn) = match (from_location, to_location) {
5250 (&LocationId::Process(from), &LocationId::Process(to)) => {
5251 let from_node = processes
5252 .get(from)
5253 .unwrap_or_else(|| {
5254 panic!("A process used in the graph was not instantiated: {}", from)
5255 })
5256 .clone();
5257 let to_node = processes
5258 .get(to)
5259 .unwrap_or_else(|| {
5260 panic!("A process used in the graph was not instantiated: {}", to)
5261 })
5262 .clone();
5263
5264 let sink_port = from_node.next_port();
5265 let source_port = to_node.next_port();
5266
5267 (
5268 D::o2o_sink_source(
5269 env,
5270 &from_node,
5271 &sink_port,
5272 &to_node,
5273 &source_port,
5274 name,
5275 networking_info,
5276 ),
5277 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5278 )
5279 }
5280 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5281 let from_node = processes
5282 .get(from)
5283 .unwrap_or_else(|| {
5284 panic!("A process used in the graph was not instantiated: {}", from)
5285 })
5286 .clone();
5287 let to_node = clusters
5288 .get(to)
5289 .unwrap_or_else(|| {
5290 panic!("A cluster used in the graph was not instantiated: {}", to)
5291 })
5292 .clone();
5293
5294 let sink_port = from_node.next_port();
5295 let source_port = to_node.next_port();
5296
5297 (
5298 D::o2m_sink_source(
5299 env,
5300 &from_node,
5301 &sink_port,
5302 &to_node,
5303 &source_port,
5304 name,
5305 networking_info,
5306 ),
5307 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5308 )
5309 }
5310 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5311 let from_node = clusters
5312 .get(from)
5313 .unwrap_or_else(|| {
5314 panic!("A cluster used in the graph was not instantiated: {}", from)
5315 })
5316 .clone();
5317 let to_node = processes
5318 .get(to)
5319 .unwrap_or_else(|| {
5320 panic!("A process used in the graph was not instantiated: {}", to)
5321 })
5322 .clone();
5323
5324 let sink_port = from_node.next_port();
5325 let source_port = to_node.next_port();
5326
5327 (
5328 D::m2o_sink_source(
5329 env,
5330 &from_node,
5331 &sink_port,
5332 &to_node,
5333 &source_port,
5334 name,
5335 networking_info,
5336 ),
5337 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5338 )
5339 }
5340 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5341 let from_node = clusters
5342 .get(from)
5343 .unwrap_or_else(|| {
5344 panic!("A cluster used in the graph was not instantiated: {}", from)
5345 })
5346 .clone();
5347 let to_node = clusters
5348 .get(to)
5349 .unwrap_or_else(|| {
5350 panic!("A cluster used in the graph was not instantiated: {}", to)
5351 })
5352 .clone();
5353
5354 let sink_port = from_node.next_port();
5355 let source_port = to_node.next_port();
5356
5357 (
5358 D::m2m_sink_source(
5359 env,
5360 &from_node,
5361 &sink_port,
5362 &to_node,
5363 &source_port,
5364 name,
5365 networking_info,
5366 ),
5367 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5368 )
5369 }
5370 (LocationId::Tick(_, _), _) => panic!(),
5371 (_, LocationId::Tick(_, _)) => panic!(),
5372 (LocationId::Atomic(_), _) => panic!(),
5373 (_, LocationId::Atomic(_)) => panic!(),
5374 };
5375 (sink, source, connect_fn)
5376}
5377
5378#[cfg(test)]
5379mod serde_test;
5380
5381#[cfg(test)]
5382mod test {
5383 use std::mem::size_of;
5384
5385 use stageleft::{QuotedWithContext, q};
5386
5387 use super::*;
5388
5389 #[test]
5390 #[cfg_attr(
5391 not(feature = "build"),
5392 ignore = "expects inclusion of feature-gated fields"
5393 )]
5394 fn hydro_node_size() {
5395 assert_eq!(size_of::<HydroNode>(), 264);
5396 }
5397
5398 #[test]
5399 #[cfg_attr(
5400 not(feature = "build"),
5401 ignore = "expects inclusion of feature-gated fields"
5402 )]
5403 fn hydro_root_size() {
5404 assert_eq!(size_of::<HydroRoot>(), 136);
5405 }
5406
5407 #[test]
5408 fn test_simplify_q_macro_basic() {
5409 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5411 let result = simplify_q_macro(simple_expr.clone());
5412 assert_eq!(result, simple_expr);
5413 }
5414
5415 #[test]
5416 fn test_simplify_q_macro_actual_stageleft_call() {
5417 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5419 let result = simplify_q_macro(stageleft_call);
5420 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5423 }
5424
5425 #[test]
5426 fn test_closure_no_pipe_at_start() {
5427 let stageleft_call = q!({
5429 let foo = 123;
5430 move |b: usize| b + foo
5431 })
5432 .splice_fn1_ctx(&());
5433 let result = simplify_q_macro(stageleft_call);
5434 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5435 }
5436}