1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl From<syn::Expr> for DebugExpr {
44 fn from(expr: syn::Expr) -> Self {
45 Self(Box::new(expr))
46 }
47}
48
49impl Deref for DebugExpr {
50 type Target = syn::Expr;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57impl ToTokens for DebugExpr {
58 fn to_tokens(&self, tokens: &mut TokenStream) {
59 self.0.to_tokens(tokens);
60 }
61}
62
63impl Debug for DebugExpr {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "{}", self.0.to_token_stream())
66 }
67}
68
69impl Display for DebugExpr {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 let original = self.0.as_ref().clone();
72 let simplified = simplify_q_macro(original);
73
74 write!(f, "q!({})", quote::quote!(#simplified))
77 }
78}
79
80fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
82 let mut simplifier = QMacroSimplifier::new();
85 simplifier.visit_expr_mut(&mut expr);
86
87 if let Some(simplified) = simplifier.simplified_result {
89 simplified
90 } else {
91 expr
92 }
93}
94
95#[derive(Default)]
97pub struct QMacroSimplifier {
98 pub simplified_result: Option<syn::Expr>,
99}
100
101impl QMacroSimplifier {
102 pub fn new() -> Self {
103 Self::default()
104 }
105}
106
107impl VisitMut for QMacroSimplifier {
108 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
109 if self.simplified_result.is_some() {
111 return;
112 }
113
114 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
115 && self.is_stageleft_runtime_support_call(&path_expr.path)
117 && let Some(closure) = self.extract_closure_from_args(&call.args)
119 {
120 self.simplified_result = Some(closure);
121 return;
122 }
123
124 syn::visit_mut::visit_expr_mut(self, expr);
127 }
128}
129
130impl QMacroSimplifier {
131 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
132 if let Some(last_segment) = path.segments.last() {
134 let fn_name = last_segment.ident.to_string();
135 fn_name.contains("_type_hint")
137 && path.segments.len() > 2
138 && path.segments[0].ident == "stageleft"
139 && path.segments[1].ident == "runtime_support"
140 } else {
141 false
142 }
143 }
144
145 fn extract_closure_from_args(
146 &self,
147 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
148 ) -> Option<syn::Expr> {
149 for arg in args {
151 if let syn::Expr::Closure(_) = arg {
152 return Some(arg.clone());
153 }
154 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
156 return Some(closure_expr);
157 }
158 }
159 None
160 }
161
162 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
163 let mut visitor = ClosureFinder {
164 found_closure: None,
165 prefer_inner_blocks: true,
166 };
167 visitor.visit_expr(expr);
168 visitor.found_closure
169 }
170}
171
172struct ClosureFinder {
174 found_closure: Option<syn::Expr>,
175 prefer_inner_blocks: bool,
176}
177
178impl<'ast> Visit<'ast> for ClosureFinder {
179 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
180 if self.found_closure.is_some() {
182 return;
183 }
184
185 match expr {
186 syn::Expr::Closure(_) => {
187 self.found_closure = Some(expr.clone());
188 }
189 syn::Expr::Block(block) if self.prefer_inner_blocks => {
190 for stmt in &block.block.stmts {
192 if let syn::Stmt::Expr(stmt_expr, _) = stmt
193 && let syn::Expr::Block(_) = stmt_expr
194 {
195 let mut inner_visitor = ClosureFinder {
197 found_closure: None,
198 prefer_inner_blocks: false, };
200 inner_visitor.visit_expr(stmt_expr);
201 if inner_visitor.found_closure.is_some() {
202 self.found_closure = Some(stmt_expr.clone());
204 return;
205 }
206 }
207 }
208
209 visit::visit_expr(self, expr);
211
212 if self.found_closure.is_some() {
215 }
217 }
218 _ => {
219 visit::visit_expr(self, expr);
221 }
222 }
223 }
224}
225
226#[derive(Clone, PartialEq, Eq, Hash)]
230pub struct DebugType(pub Box<syn::Type>);
231
232impl From<syn::Type> for DebugType {
233 fn from(t: syn::Type) -> Self {
234 Self(Box::new(t))
235 }
236}
237
238impl Deref for DebugType {
239 type Target = syn::Type;
240
241 fn deref(&self) -> &Self::Target {
242 &self.0
243 }
244}
245
246impl ToTokens for DebugType {
247 fn to_tokens(&self, tokens: &mut TokenStream) {
248 self.0.to_tokens(tokens);
249 }
250}
251
252impl Debug for DebugType {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 write!(f, "{}", self.0.to_token_stream())
255 }
256}
257
258pub enum DebugInstantiate {
259 Building,
260 Finalized(Box<DebugInstantiateFinalized>),
261}
262
263#[cfg_attr(
264 not(feature = "build"),
265 expect(
266 dead_code,
267 reason = "sink, source unused without `feature = \"build\"`."
268 )
269)]
270pub struct DebugInstantiateFinalized {
271 sink: syn::Expr,
272 source: syn::Expr,
273 connect_fn: Option<Box<dyn FnOnce()>>,
274}
275
276impl From<DebugInstantiateFinalized> for DebugInstantiate {
277 fn from(f: DebugInstantiateFinalized) -> Self {
278 Self::Finalized(Box::new(f))
279 }
280}
281
282impl Debug for DebugInstantiate {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 write!(f, "<network instantiate>")
285 }
286}
287
288impl Hash for DebugInstantiate {
289 fn hash<H: Hasher>(&self, _state: &mut H) {
290 }
292}
293
294impl Clone for DebugInstantiate {
295 fn clone(&self) -> Self {
296 match self {
297 DebugInstantiate::Building => DebugInstantiate::Building,
298 DebugInstantiate::Finalized(_) => {
299 panic!("DebugInstantiate::Finalized should not be cloned")
300 }
301 }
302 }
303}
304
305#[derive(Debug, Hash, Clone)]
314pub enum ClusterMembersState {
315 Uninit,
317 Stream(DebugExpr),
320 Tee(LocationId, LocationId),
324}
325
326#[derive(Debug, Hash, Clone)]
328pub enum HydroSource {
329 Stream(DebugExpr),
330 ExternalNetwork(),
331 Iter(DebugExpr),
332 Spin(),
333 ClusterMembers(LocationId, ClusterMembersState),
334 Embedded(syn::Ident),
335 EmbeddedSingleton(syn::Ident),
336}
337
338#[cfg(feature = "build")]
339pub trait DfirBuilder {
345 fn singleton_intermediates(&self) -> bool;
347
348 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
350
351 fn batch(
352 &mut self,
353 in_ident: syn::Ident,
354 in_location: &LocationId,
355 in_kind: &CollectionKind,
356 out_ident: &syn::Ident,
357 out_location: &LocationId,
358 op_meta: &HydroIrOpMetadata,
359 );
360 fn yield_from_tick(
361 &mut self,
362 in_ident: syn::Ident,
363 in_location: &LocationId,
364 in_kind: &CollectionKind,
365 out_ident: &syn::Ident,
366 out_location: &LocationId,
367 );
368
369 fn begin_atomic(
370 &mut self,
371 in_ident: syn::Ident,
372 in_location: &LocationId,
373 in_kind: &CollectionKind,
374 out_ident: &syn::Ident,
375 out_location: &LocationId,
376 op_meta: &HydroIrOpMetadata,
377 );
378 fn end_atomic(
379 &mut self,
380 in_ident: syn::Ident,
381 in_location: &LocationId,
382 in_kind: &CollectionKind,
383 out_ident: &syn::Ident,
384 );
385
386 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
387 fn observe_nondet(
388 &mut self,
389 trusted: bool,
390 location: &LocationId,
391 in_ident: syn::Ident,
392 in_kind: &CollectionKind,
393 out_ident: &syn::Ident,
394 out_kind: &CollectionKind,
395 op_meta: &HydroIrOpMetadata,
396 );
397
398 #[expect(clippy::too_many_arguments, reason = "TODO")]
399 fn create_network(
400 &mut self,
401 from: &LocationId,
402 to: &LocationId,
403 input_ident: syn::Ident,
404 out_ident: &syn::Ident,
405 serialize: Option<&DebugExpr>,
406 sink: syn::Expr,
407 source: syn::Expr,
408 deserialize: Option<&DebugExpr>,
409 tag_id: usize,
410 networking_info: &crate::networking::NetworkingInfo,
411 );
412
413 fn create_external_source(
414 &mut self,
415 on: &LocationId,
416 source_expr: syn::Expr,
417 out_ident: &syn::Ident,
418 deserialize: Option<&DebugExpr>,
419 tag_id: usize,
420 );
421
422 fn create_external_output(
423 &mut self,
424 on: &LocationId,
425 sink_expr: syn::Expr,
426 input_ident: &syn::Ident,
427 serialize: Option<&DebugExpr>,
428 tag_id: usize,
429 );
430}
431
432#[cfg(feature = "build")]
433impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
434 fn singleton_intermediates(&self) -> bool {
435 false
436 }
437
438 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
439 self.entry(location.root().key())
440 .expect("location was removed")
441 .or_default()
442 }
443
444 fn batch(
445 &mut self,
446 in_ident: syn::Ident,
447 in_location: &LocationId,
448 in_kind: &CollectionKind,
449 out_ident: &syn::Ident,
450 _out_location: &LocationId,
451 _op_meta: &HydroIrOpMetadata,
452 ) {
453 let builder = self.get_dfir_mut(in_location.root());
454 if in_kind.is_bounded()
455 && matches!(
456 in_kind,
457 CollectionKind::Singleton { .. }
458 | CollectionKind::Optional { .. }
459 | CollectionKind::KeyedSingleton { .. }
460 )
461 {
462 assert!(in_location.is_top_level());
463 builder.add_dfir(
464 parse_quote! {
465 #out_ident = #in_ident -> persist::<'static>();
466 },
467 None,
468 None,
469 );
470 } else {
471 builder.add_dfir(
472 parse_quote! {
473 #out_ident = #in_ident;
474 },
475 None,
476 None,
477 );
478 }
479 }
480
481 fn yield_from_tick(
482 &mut self,
483 in_ident: syn::Ident,
484 in_location: &LocationId,
485 _in_kind: &CollectionKind,
486 out_ident: &syn::Ident,
487 _out_location: &LocationId,
488 ) {
489 let builder = self.get_dfir_mut(in_location.root());
490 builder.add_dfir(
491 parse_quote! {
492 #out_ident = #in_ident;
493 },
494 None,
495 None,
496 );
497 }
498
499 fn begin_atomic(
500 &mut self,
501 in_ident: syn::Ident,
502 in_location: &LocationId,
503 _in_kind: &CollectionKind,
504 out_ident: &syn::Ident,
505 _out_location: &LocationId,
506 _op_meta: &HydroIrOpMetadata,
507 ) {
508 let builder = self.get_dfir_mut(in_location.root());
509 builder.add_dfir(
510 parse_quote! {
511 #out_ident = #in_ident;
512 },
513 None,
514 None,
515 );
516 }
517
518 fn end_atomic(
519 &mut self,
520 in_ident: syn::Ident,
521 in_location: &LocationId,
522 _in_kind: &CollectionKind,
523 out_ident: &syn::Ident,
524 ) {
525 let builder = self.get_dfir_mut(in_location.root());
526 builder.add_dfir(
527 parse_quote! {
528 #out_ident = #in_ident;
529 },
530 None,
531 None,
532 );
533 }
534
535 fn observe_nondet(
536 &mut self,
537 _trusted: bool,
538 location: &LocationId,
539 in_ident: syn::Ident,
540 _in_kind: &CollectionKind,
541 out_ident: &syn::Ident,
542 _out_kind: &CollectionKind,
543 _op_meta: &HydroIrOpMetadata,
544 ) {
545 let builder = self.get_dfir_mut(location);
546 builder.add_dfir(
547 parse_quote! {
548 #out_ident = #in_ident;
549 },
550 None,
551 None,
552 );
553 }
554
555 fn create_network(
556 &mut self,
557 from: &LocationId,
558 to: &LocationId,
559 input_ident: syn::Ident,
560 out_ident: &syn::Ident,
561 serialize: Option<&DebugExpr>,
562 sink: syn::Expr,
563 source: syn::Expr,
564 deserialize: Option<&DebugExpr>,
565 tag_id: usize,
566 _networking_info: &crate::networking::NetworkingInfo,
567 ) {
568 let sender_builder = self.get_dfir_mut(from);
569 if let Some(serialize_pipeline) = serialize {
570 sender_builder.add_dfir(
571 parse_quote! {
572 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
573 },
574 None,
575 Some(&format!("send{}", tag_id)),
577 );
578 } else {
579 sender_builder.add_dfir(
580 parse_quote! {
581 #input_ident -> dest_sink(#sink);
582 },
583 None,
584 Some(&format!("send{}", tag_id)),
585 );
586 }
587
588 let receiver_builder = self.get_dfir_mut(to);
589 if let Some(deserialize_pipeline) = deserialize {
590 receiver_builder.add_dfir(
591 parse_quote! {
592 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
593 },
594 None,
595 Some(&format!("recv{}", tag_id)),
596 );
597 } else {
598 receiver_builder.add_dfir(
599 parse_quote! {
600 #out_ident = source_stream(#source);
601 },
602 None,
603 Some(&format!("recv{}", tag_id)),
604 );
605 }
606 }
607
608 fn create_external_source(
609 &mut self,
610 on: &LocationId,
611 source_expr: syn::Expr,
612 out_ident: &syn::Ident,
613 deserialize: Option<&DebugExpr>,
614 tag_id: usize,
615 ) {
616 let receiver_builder = self.get_dfir_mut(on);
617 if let Some(deserialize_pipeline) = deserialize {
618 receiver_builder.add_dfir(
619 parse_quote! {
620 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
621 },
622 None,
623 Some(&format!("recv{}", tag_id)),
624 );
625 } else {
626 receiver_builder.add_dfir(
627 parse_quote! {
628 #out_ident = source_stream(#source_expr);
629 },
630 None,
631 Some(&format!("recv{}", tag_id)),
632 );
633 }
634 }
635
636 fn create_external_output(
637 &mut self,
638 on: &LocationId,
639 sink_expr: syn::Expr,
640 input_ident: &syn::Ident,
641 serialize: Option<&DebugExpr>,
642 tag_id: usize,
643 ) {
644 let sender_builder = self.get_dfir_mut(on);
645 if let Some(serialize_fn) = serialize {
646 sender_builder.add_dfir(
647 parse_quote! {
648 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
649 },
650 None,
651 Some(&format!("send{}", tag_id)),
653 );
654 } else {
655 sender_builder.add_dfir(
656 parse_quote! {
657 #input_ident -> dest_sink(#sink_expr);
658 },
659 None,
660 Some(&format!("send{}", tag_id)),
661 );
662 }
663 }
664}
665
666#[cfg(feature = "build")]
667pub enum BuildersOrCallback<'a, L, N>
668where
669 L: FnMut(&mut HydroRoot, &mut usize),
670 N: FnMut(&mut HydroNode, &mut usize),
671{
672 Builders(&'a mut dyn DfirBuilder),
673 Callback(L, N),
674}
675
676#[derive(Debug, Hash)]
680pub enum HydroRoot {
681 ForEach {
682 f: DebugExpr,
683 input: Box<HydroNode>,
684 op_metadata: HydroIrOpMetadata,
685 },
686 SendExternal {
687 to_external_key: LocationKey,
688 to_port_id: ExternalPortId,
689 to_many: bool,
690 unpaired: bool,
691 serialize_fn: Option<DebugExpr>,
692 instantiate_fn: DebugInstantiate,
693 input: Box<HydroNode>,
694 op_metadata: HydroIrOpMetadata,
695 },
696 DestSink {
697 sink: DebugExpr,
698 input: Box<HydroNode>,
699 op_metadata: HydroIrOpMetadata,
700 },
701 CycleSink {
702 cycle_id: CycleId,
703 input: Box<HydroNode>,
704 op_metadata: HydroIrOpMetadata,
705 },
706 EmbeddedOutput {
707 ident: syn::Ident,
708 input: Box<HydroNode>,
709 op_metadata: HydroIrOpMetadata,
710 },
711 Null {
712 input: Box<HydroNode>,
713 op_metadata: HydroIrOpMetadata,
714 },
715}
716
717impl HydroRoot {
718 #[cfg(feature = "build")]
719 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
720 pub fn compile_network<'a, D>(
721 &mut self,
722 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723 seen_tees: &mut SeenSharedNodes,
724 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
725 processes: &SparseSecondaryMap<LocationKey, D::Process>,
726 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727 externals: &SparseSecondaryMap<LocationKey, D::External>,
728 env: &mut D::InstantiateEnv,
729 ) where
730 D: Deploy<'a>,
731 {
732 let refcell_extra_stmts = RefCell::new(extra_stmts);
733 let refcell_env = RefCell::new(env);
734 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
735 self.transform_bottom_up(
736 &mut |l| {
737 if let HydroRoot::SendExternal {
738 input,
739 to_external_key,
740 to_port_id,
741 to_many,
742 unpaired,
743 instantiate_fn,
744 ..
745 } = l
746 {
747 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
748 DebugInstantiate::Building => {
749 let to_node = externals
750 .get(*to_external_key)
751 .unwrap_or_else(|| {
752 panic!("A external used in the graph was not instantiated: {}", to_external_key)
753 })
754 .clone();
755
756 match input.metadata().location_id.root() {
757 &LocationId::Process(process_key) => {
758 if *to_many {
759 (
760 (
761 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
762 parse_quote!(DUMMY),
763 ),
764 Box::new(|| {}) as Box<dyn FnOnce()>,
765 )
766 } else {
767 let from_node = processes
768 .get(process_key)
769 .unwrap_or_else(|| {
770 panic!("A process used in the graph was not instantiated: {}", process_key)
771 })
772 .clone();
773
774 let sink_port = from_node.next_port();
775 let source_port = to_node.next_port();
776
777 if *unpaired {
778 use stageleft::quote_type;
779 use tokio_util::codec::LengthDelimitedCodec;
780
781 to_node.register(*to_port_id, source_port.clone());
782
783 let _ = D::e2o_source(
784 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
785 &to_node, &source_port,
786 &from_node, &sink_port,
787 "e_type::<LengthDelimitedCodec>(),
788 format!("{}_{}", *to_external_key, *to_port_id)
789 );
790 }
791
792 (
793 (
794 D::o2e_sink(
795 &from_node,
796 &sink_port,
797 &to_node,
798 &source_port,
799 format!("{}_{}", *to_external_key, *to_port_id)
800 ),
801 parse_quote!(DUMMY),
802 ),
803 if *unpaired {
804 D::e2o_connect(
805 &to_node,
806 &source_port,
807 &from_node,
808 &sink_port,
809 *to_many,
810 NetworkHint::Auto,
811 )
812 } else {
813 Box::new(|| {}) as Box<dyn FnOnce()>
814 },
815 )
816 }
817 }
818 LocationId::Cluster(cluster_key) => {
819 let from_node = clusters
820 .get(*cluster_key)
821 .unwrap_or_else(|| {
822 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
823 })
824 .clone();
825
826 let sink_port = from_node.next_port();
827 let source_port = to_node.next_port();
828
829 if *unpaired {
830 to_node.register(*to_port_id, source_port.clone());
831 }
832
833 (
834 (
835 D::m2e_sink(
836 &from_node,
837 &sink_port,
838 &to_node,
839 &source_port,
840 format!("{}_{}", *to_external_key, *to_port_id)
841 ),
842 parse_quote!(DUMMY),
843 ),
844 Box::new(|| {}) as Box<dyn FnOnce()>,
845 )
846 }
847 _ => panic!()
848 }
849 },
850
851 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
852 };
853
854 *instantiate_fn = DebugInstantiateFinalized {
855 sink: sink_expr,
856 source: source_expr,
857 connect_fn: Some(connect_fn),
858 }
859 .into();
860 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
861 let element_type = match &input.metadata().collection_kind {
862 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
863 _ => panic!("Embedded output must have Stream collection kind"),
864 };
865 let location_key = match input.metadata().location_id.root() {
866 LocationId::Process(key) | LocationId::Cluster(key) => *key,
867 _ => panic!("Embedded output must be on a process or cluster"),
868 };
869 D::register_embedded_output(
870 &mut refcell_env.borrow_mut(),
871 location_key,
872 ident,
873 &element_type,
874 );
875 }
876 },
877 &mut |n| {
878 if let HydroNode::Network {
879 name,
880 networking_info,
881 input,
882 instantiate_fn,
883 metadata,
884 ..
885 } = n
886 {
887 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
888 DebugInstantiate::Building => instantiate_network::<D>(
889 &mut refcell_env.borrow_mut(),
890 input.metadata().location_id.root(),
891 metadata.location_id.root(),
892 processes,
893 clusters,
894 name.as_deref(),
895 networking_info,
896 ),
897
898 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
899 };
900
901 *instantiate_fn = DebugInstantiateFinalized {
902 sink: sink_expr,
903 source: source_expr,
904 connect_fn: Some(connect_fn),
905 }
906 .into();
907 } else if let HydroNode::ExternalInput {
908 from_external_key,
909 from_port_id,
910 from_many,
911 codec_type,
912 port_hint,
913 instantiate_fn,
914 metadata,
915 ..
916 } = n
917 {
918 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
919 DebugInstantiate::Building => {
920 let from_node = externals
921 .get(*from_external_key)
922 .unwrap_or_else(|| {
923 panic!(
924 "A external used in the graph was not instantiated: {}",
925 from_external_key,
926 )
927 })
928 .clone();
929
930 match metadata.location_id.root() {
931 &LocationId::Process(process_key) => {
932 let to_node = processes
933 .get(process_key)
934 .unwrap_or_else(|| {
935 panic!("A process used in the graph was not instantiated: {}", process_key)
936 })
937 .clone();
938
939 let sink_port = from_node.next_port();
940 let source_port = to_node.next_port();
941
942 from_node.register(*from_port_id, sink_port.clone());
943
944 (
945 (
946 parse_quote!(DUMMY),
947 if *from_many {
948 D::e2o_many_source(
949 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
950 &to_node, &source_port,
951 codec_type.0.as_ref(),
952 format!("{}_{}", *from_external_key, *from_port_id)
953 )
954 } else {
955 D::e2o_source(
956 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
957 &from_node, &sink_port,
958 &to_node, &source_port,
959 codec_type.0.as_ref(),
960 format!("{}_{}", *from_external_key, *from_port_id)
961 )
962 },
963 ),
964 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
965 )
966 }
967 LocationId::Cluster(cluster_key) => {
968 let to_node = clusters
969 .get(*cluster_key)
970 .unwrap_or_else(|| {
971 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
972 })
973 .clone();
974
975 let sink_port = from_node.next_port();
976 let source_port = to_node.next_port();
977
978 from_node.register(*from_port_id, sink_port.clone());
979
980 (
981 (
982 parse_quote!(DUMMY),
983 D::e2m_source(
984 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
985 &from_node, &sink_port,
986 &to_node, &source_port,
987 codec_type.0.as_ref(),
988 format!("{}_{}", *from_external_key, *from_port_id)
989 ),
990 ),
991 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
992 )
993 }
994 _ => panic!()
995 }
996 },
997
998 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
999 };
1000
1001 *instantiate_fn = DebugInstantiateFinalized {
1002 sink: sink_expr,
1003 source: source_expr,
1004 connect_fn: Some(connect_fn),
1005 }
1006 .into();
1007 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1008 let element_type = match &metadata.collection_kind {
1009 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1010 _ => panic!("Embedded source must have Stream collection kind"),
1011 };
1012 let location_key = match metadata.location_id.root() {
1013 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1014 _ => panic!("Embedded source must be on a process or cluster"),
1015 };
1016 D::register_embedded_stream_input(
1017 &mut refcell_env.borrow_mut(),
1018 location_key,
1019 ident,
1020 &element_type,
1021 );
1022 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1023 let element_type = match &metadata.collection_kind {
1024 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1025 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1026 };
1027 let location_key = match metadata.location_id.root() {
1028 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1029 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1030 };
1031 D::register_embedded_singleton_input(
1032 &mut refcell_env.borrow_mut(),
1033 location_key,
1034 ident,
1035 &element_type,
1036 );
1037 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1038 match state {
1039 ClusterMembersState::Uninit => {
1040 let at_location = metadata.location_id.root().clone();
1041 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1042 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1043 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1045 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1046 &(),
1047 );
1048 *state = ClusterMembersState::Stream(expr.into());
1049 } else {
1050 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1052 }
1053 }
1054 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1055 panic!("cluster members already finalized");
1056 }
1057 }
1058 }
1059 },
1060 seen_tees,
1061 false,
1062 );
1063 }
1064
1065 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1066 self.transform_bottom_up(
1067 &mut |l| {
1068 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1069 match instantiate_fn {
1070 DebugInstantiate::Building => panic!("network not built"),
1071
1072 DebugInstantiate::Finalized(finalized) => {
1073 (finalized.connect_fn.take().unwrap())();
1074 }
1075 }
1076 }
1077 },
1078 &mut |n| {
1079 if let HydroNode::Network { instantiate_fn, .. }
1080 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1081 {
1082 match instantiate_fn {
1083 DebugInstantiate::Building => panic!("network not built"),
1084
1085 DebugInstantiate::Finalized(finalized) => {
1086 (finalized.connect_fn.take().unwrap())();
1087 }
1088 }
1089 }
1090 },
1091 seen_tees,
1092 false,
1093 );
1094 }
1095
1096 pub fn transform_bottom_up(
1097 &mut self,
1098 transform_root: &mut impl FnMut(&mut HydroRoot),
1099 transform_node: &mut impl FnMut(&mut HydroNode),
1100 seen_tees: &mut SeenSharedNodes,
1101 check_well_formed: bool,
1102 ) {
1103 self.transform_children(
1104 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1105 seen_tees,
1106 );
1107
1108 transform_root(self);
1109 }
1110
1111 pub fn transform_children(
1112 &mut self,
1113 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1114 seen_tees: &mut SeenSharedNodes,
1115 ) {
1116 match self {
1117 HydroRoot::ForEach { input, .. }
1118 | HydroRoot::SendExternal { input, .. }
1119 | HydroRoot::DestSink { input, .. }
1120 | HydroRoot::CycleSink { input, .. }
1121 | HydroRoot::EmbeddedOutput { input, .. }
1122 | HydroRoot::Null { input, .. } => {
1123 transform(input, seen_tees);
1124 }
1125 }
1126 }
1127
1128 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1129 match self {
1130 HydroRoot::ForEach {
1131 f,
1132 input,
1133 op_metadata,
1134 } => HydroRoot::ForEach {
1135 f: f.clone(),
1136 input: Box::new(input.deep_clone(seen_tees)),
1137 op_metadata: op_metadata.clone(),
1138 },
1139 HydroRoot::SendExternal {
1140 to_external_key,
1141 to_port_id,
1142 to_many,
1143 unpaired,
1144 serialize_fn,
1145 instantiate_fn,
1146 input,
1147 op_metadata,
1148 } => HydroRoot::SendExternal {
1149 to_external_key: *to_external_key,
1150 to_port_id: *to_port_id,
1151 to_many: *to_many,
1152 unpaired: *unpaired,
1153 serialize_fn: serialize_fn.clone(),
1154 instantiate_fn: instantiate_fn.clone(),
1155 input: Box::new(input.deep_clone(seen_tees)),
1156 op_metadata: op_metadata.clone(),
1157 },
1158 HydroRoot::DestSink {
1159 sink,
1160 input,
1161 op_metadata,
1162 } => HydroRoot::DestSink {
1163 sink: sink.clone(),
1164 input: Box::new(input.deep_clone(seen_tees)),
1165 op_metadata: op_metadata.clone(),
1166 },
1167 HydroRoot::CycleSink {
1168 cycle_id,
1169 input,
1170 op_metadata,
1171 } => HydroRoot::CycleSink {
1172 cycle_id: *cycle_id,
1173 input: Box::new(input.deep_clone(seen_tees)),
1174 op_metadata: op_metadata.clone(),
1175 },
1176 HydroRoot::EmbeddedOutput {
1177 ident,
1178 input,
1179 op_metadata,
1180 } => HydroRoot::EmbeddedOutput {
1181 ident: ident.clone(),
1182 input: Box::new(input.deep_clone(seen_tees)),
1183 op_metadata: op_metadata.clone(),
1184 },
1185 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1186 input: Box::new(input.deep_clone(seen_tees)),
1187 op_metadata: op_metadata.clone(),
1188 },
1189 }
1190 }
1191
1192 #[cfg(feature = "build")]
1193 pub fn emit(
1194 &mut self,
1195 graph_builders: &mut dyn DfirBuilder,
1196 seen_tees: &mut SeenSharedNodes,
1197 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1198 next_stmt_id: &mut usize,
1199 ) {
1200 self.emit_core(
1201 &mut BuildersOrCallback::<
1202 fn(&mut HydroRoot, &mut usize),
1203 fn(&mut HydroNode, &mut usize),
1204 >::Builders(graph_builders),
1205 seen_tees,
1206 built_tees,
1207 next_stmt_id,
1208 );
1209 }
1210
1211 #[cfg(feature = "build")]
1212 pub fn emit_core(
1213 &mut self,
1214 builders_or_callback: &mut BuildersOrCallback<
1215 impl FnMut(&mut HydroRoot, &mut usize),
1216 impl FnMut(&mut HydroNode, &mut usize),
1217 >,
1218 seen_tees: &mut SeenSharedNodes,
1219 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1220 next_stmt_id: &mut usize,
1221 ) {
1222 match self {
1223 HydroRoot::ForEach { f, input, .. } => {
1224 let input_ident =
1225 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1226
1227 match builders_or_callback {
1228 BuildersOrCallback::Builders(graph_builders) => {
1229 graph_builders
1230 .get_dfir_mut(&input.metadata().location_id)
1231 .add_dfir(
1232 parse_quote! {
1233 #input_ident -> for_each(#f);
1234 },
1235 None,
1236 Some(&next_stmt_id.to_string()),
1237 );
1238 }
1239 BuildersOrCallback::Callback(leaf_callback, _) => {
1240 leaf_callback(self, next_stmt_id);
1241 }
1242 }
1243
1244 *next_stmt_id += 1;
1245 }
1246
1247 HydroRoot::SendExternal {
1248 serialize_fn,
1249 instantiate_fn,
1250 input,
1251 ..
1252 } => {
1253 let input_ident =
1254 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1255
1256 match builders_or_callback {
1257 BuildersOrCallback::Builders(graph_builders) => {
1258 let (sink_expr, _) = match instantiate_fn {
1259 DebugInstantiate::Building => (
1260 syn::parse_quote!(DUMMY_SINK),
1261 syn::parse_quote!(DUMMY_SOURCE),
1262 ),
1263
1264 DebugInstantiate::Finalized(finalized) => {
1265 (finalized.sink.clone(), finalized.source.clone())
1266 }
1267 };
1268
1269 graph_builders.create_external_output(
1270 &input.metadata().location_id,
1271 sink_expr,
1272 &input_ident,
1273 serialize_fn.as_ref(),
1274 *next_stmt_id,
1275 );
1276 }
1277 BuildersOrCallback::Callback(leaf_callback, _) => {
1278 leaf_callback(self, next_stmt_id);
1279 }
1280 }
1281
1282 *next_stmt_id += 1;
1283 }
1284
1285 HydroRoot::DestSink { sink, input, .. } => {
1286 let input_ident =
1287 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1288
1289 match builders_or_callback {
1290 BuildersOrCallback::Builders(graph_builders) => {
1291 graph_builders
1292 .get_dfir_mut(&input.metadata().location_id)
1293 .add_dfir(
1294 parse_quote! {
1295 #input_ident -> dest_sink(#sink);
1296 },
1297 None,
1298 Some(&next_stmt_id.to_string()),
1299 );
1300 }
1301 BuildersOrCallback::Callback(leaf_callback, _) => {
1302 leaf_callback(self, next_stmt_id);
1303 }
1304 }
1305
1306 *next_stmt_id += 1;
1307 }
1308
1309 HydroRoot::CycleSink {
1310 cycle_id, input, ..
1311 } => {
1312 let input_ident =
1313 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1314
1315 match builders_or_callback {
1316 BuildersOrCallback::Builders(graph_builders) => {
1317 let elem_type: syn::Type = match &input.metadata().collection_kind {
1318 CollectionKind::KeyedSingleton {
1319 key_type,
1320 value_type,
1321 ..
1322 }
1323 | CollectionKind::KeyedStream {
1324 key_type,
1325 value_type,
1326 ..
1327 } => {
1328 parse_quote!((#key_type, #value_type))
1329 }
1330 CollectionKind::Stream { element_type, .. }
1331 | CollectionKind::Singleton { element_type, .. }
1332 | CollectionKind::Optional { element_type, .. } => {
1333 parse_quote!(#element_type)
1334 }
1335 };
1336
1337 let cycle_id_ident = cycle_id.as_ident();
1338 graph_builders
1339 .get_dfir_mut(&input.metadata().location_id)
1340 .add_dfir(
1341 parse_quote! {
1342 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1343 },
1344 None,
1345 None,
1346 );
1347 }
1348 BuildersOrCallback::Callback(_, _) => {}
1350 }
1351 }
1352
1353 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1354 let input_ident =
1355 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1356
1357 match builders_or_callback {
1358 BuildersOrCallback::Builders(graph_builders) => {
1359 graph_builders
1360 .get_dfir_mut(&input.metadata().location_id)
1361 .add_dfir(
1362 parse_quote! {
1363 #input_ident -> for_each(&mut #ident);
1364 },
1365 None,
1366 Some(&next_stmt_id.to_string()),
1367 );
1368 }
1369 BuildersOrCallback::Callback(leaf_callback, _) => {
1370 leaf_callback(self, next_stmt_id);
1371 }
1372 }
1373
1374 *next_stmt_id += 1;
1375 }
1376
1377 HydroRoot::Null { input, .. } => {
1378 let input_ident =
1379 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1380
1381 match builders_or_callback {
1382 BuildersOrCallback::Builders(graph_builders) => {
1383 graph_builders
1384 .get_dfir_mut(&input.metadata().location_id)
1385 .add_dfir(
1386 parse_quote! {
1387 #input_ident -> for_each(|_| {});
1388 },
1389 None,
1390 Some(&next_stmt_id.to_string()),
1391 );
1392 }
1393 BuildersOrCallback::Callback(leaf_callback, _) => {
1394 leaf_callback(self, next_stmt_id);
1395 }
1396 }
1397
1398 *next_stmt_id += 1;
1399 }
1400 }
1401 }
1402
1403 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1404 match self {
1405 HydroRoot::ForEach { op_metadata, .. }
1406 | HydroRoot::SendExternal { op_metadata, .. }
1407 | HydroRoot::DestSink { op_metadata, .. }
1408 | HydroRoot::CycleSink { op_metadata, .. }
1409 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1410 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1411 }
1412 }
1413
1414 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1415 match self {
1416 HydroRoot::ForEach { op_metadata, .. }
1417 | HydroRoot::SendExternal { op_metadata, .. }
1418 | HydroRoot::DestSink { op_metadata, .. }
1419 | HydroRoot::CycleSink { op_metadata, .. }
1420 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1421 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1422 }
1423 }
1424
1425 pub fn input(&self) -> &HydroNode {
1426 match self {
1427 HydroRoot::ForEach { input, .. }
1428 | HydroRoot::SendExternal { input, .. }
1429 | HydroRoot::DestSink { input, .. }
1430 | HydroRoot::CycleSink { input, .. }
1431 | HydroRoot::EmbeddedOutput { input, .. }
1432 | HydroRoot::Null { input, .. } => input,
1433 }
1434 }
1435
1436 pub fn input_metadata(&self) -> &HydroIrMetadata {
1437 self.input().metadata()
1438 }
1439
1440 pub fn print_root(&self) -> String {
1441 match self {
1442 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1443 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1444 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1445 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1446 HydroRoot::EmbeddedOutput { ident, .. } => {
1447 format!("EmbeddedOutput({})", ident)
1448 }
1449 HydroRoot::Null { .. } => "Null".to_owned(),
1450 }
1451 }
1452
1453 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1454 match self {
1455 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1456 transform(f);
1457 }
1458 HydroRoot::SendExternal { .. }
1459 | HydroRoot::CycleSink { .. }
1460 | HydroRoot::EmbeddedOutput { .. }
1461 | HydroRoot::Null { .. } => {}
1462 }
1463 }
1464}
1465
1466#[cfg(feature = "build")]
1467fn tick_of(loc: &LocationId) -> Option<ClockId> {
1468 match loc {
1469 LocationId::Tick(id, _) => Some(*id),
1470 LocationId::Atomic(inner) => tick_of(inner),
1471 _ => None,
1472 }
1473}
1474
1475#[cfg(feature = "build")]
1476fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1477 match loc {
1478 LocationId::Tick(id, inner) => {
1479 *id = uf_find(uf, *id);
1480 remap_location(inner, uf);
1481 }
1482 LocationId::Atomic(inner) => {
1483 remap_location(inner, uf);
1484 }
1485 LocationId::Process(_) | LocationId::Cluster(_) => {}
1486 }
1487}
1488
1489#[cfg(feature = "build")]
1490fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1491 let p = *parent.get(&x).unwrap_or(&x);
1492 if p == x {
1493 return x;
1494 }
1495 let root = uf_find(parent, p);
1496 parent.insert(x, root);
1497 root
1498}
1499
1500#[cfg(feature = "build")]
1501fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1502 let ra = uf_find(parent, a);
1503 let rb = uf_find(parent, b);
1504 if ra != rb {
1505 parent.insert(ra, rb);
1506 }
1507}
1508
1509#[cfg(feature = "build")]
1513pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1514 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1515
1516 transform_bottom_up(
1518 ir,
1519 &mut |_| {},
1520 &mut |node: &mut HydroNode| {
1521 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1522 node
1523 && let (Some(a), Some(b)) = (
1524 tick_of(&inner.metadata().location_id),
1525 tick_of(&metadata.location_id),
1526 )
1527 {
1528 uf_union(&mut uf, a, b);
1529 }
1530 },
1531 false,
1532 );
1533
1534 transform_bottom_up(
1536 ir,
1537 &mut |_| {},
1538 &mut |node: &mut HydroNode| {
1539 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1540 },
1541 false,
1542 );
1543}
1544
1545#[cfg(feature = "build")]
1546pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1547 let mut builders = SecondaryMap::new();
1548 let mut seen_tees = HashMap::new();
1549 let mut built_tees = HashMap::new();
1550 let mut next_stmt_id = 0;
1551 for leaf in ir {
1552 leaf.emit(
1553 &mut builders,
1554 &mut seen_tees,
1555 &mut built_tees,
1556 &mut next_stmt_id,
1557 );
1558 }
1559 builders
1560}
1561
1562#[cfg(feature = "build")]
1563pub fn traverse_dfir(
1564 ir: &mut [HydroRoot],
1565 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1566 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1567) {
1568 let mut seen_tees = HashMap::new();
1569 let mut built_tees = HashMap::new();
1570 let mut next_stmt_id = 0;
1571 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1572 ir.iter_mut().for_each(|leaf| {
1573 leaf.emit_core(
1574 &mut callback,
1575 &mut seen_tees,
1576 &mut built_tees,
1577 &mut next_stmt_id,
1578 );
1579 });
1580}
1581
1582pub fn transform_bottom_up(
1583 ir: &mut [HydroRoot],
1584 transform_root: &mut impl FnMut(&mut HydroRoot),
1585 transform_node: &mut impl FnMut(&mut HydroNode),
1586 check_well_formed: bool,
1587) {
1588 let mut seen_tees = HashMap::new();
1589 ir.iter_mut().for_each(|leaf| {
1590 leaf.transform_bottom_up(
1591 transform_root,
1592 transform_node,
1593 &mut seen_tees,
1594 check_well_formed,
1595 );
1596 });
1597}
1598
1599pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1600 let mut seen_tees = HashMap::new();
1601 ir.iter()
1602 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1603 .collect()
1604}
1605
1606type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1607thread_local! {
1608 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1609}
1610
1611pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1612 PRINTED_TEES.with(|printed_tees| {
1613 let mut printed_tees_mut = printed_tees.borrow_mut();
1614 *printed_tees_mut = Some((0, HashMap::new()));
1615 drop(printed_tees_mut);
1616
1617 let ret = f();
1618
1619 let mut printed_tees_mut = printed_tees.borrow_mut();
1620 *printed_tees_mut = None;
1621
1622 ret
1623 })
1624}
1625
1626pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1627
1628impl SharedNode {
1629 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1630 Rc::as_ptr(&self.0)
1631 }
1632}
1633
1634impl Debug for SharedNode {
1635 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1636 PRINTED_TEES.with(|printed_tees| {
1637 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1638 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1639
1640 if let Some(printed_tees_mut) = printed_tees_mut {
1641 if let Some(existing) = printed_tees_mut
1642 .1
1643 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1644 {
1645 write!(f, "<shared {}>", existing)
1646 } else {
1647 let next_id = printed_tees_mut.0;
1648 printed_tees_mut.0 += 1;
1649 printed_tees_mut
1650 .1
1651 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1652 drop(printed_tees_mut_borrow);
1653 write!(f, "<shared {}>: ", next_id)?;
1654 Debug::fmt(&self.0.borrow(), f)
1655 }
1656 } else {
1657 drop(printed_tees_mut_borrow);
1658 write!(f, "<shared>: ")?;
1659 Debug::fmt(&self.0.borrow(), f)
1660 }
1661 })
1662 }
1663}
1664
1665impl Hash for SharedNode {
1666 fn hash<H: Hasher>(&self, state: &mut H) {
1667 self.0.borrow_mut().hash(state);
1668 }
1669}
1670
1671#[derive(Clone, PartialEq, Eq, Debug)]
1672pub enum BoundKind {
1673 Unbounded,
1674 Bounded,
1675}
1676
1677#[derive(Clone, PartialEq, Eq, Debug)]
1678pub enum StreamOrder {
1679 NoOrder,
1680 TotalOrder,
1681}
1682
1683#[derive(Clone, PartialEq, Eq, Debug)]
1684pub enum StreamRetry {
1685 AtLeastOnce,
1686 ExactlyOnce,
1687}
1688
1689#[derive(Clone, PartialEq, Eq, Debug)]
1690pub enum KeyedSingletonBoundKind {
1691 Unbounded,
1692 MonotonicValue,
1693 BoundedValue,
1694 Bounded,
1695}
1696
1697#[derive(Clone, PartialEq, Eq, Debug)]
1698pub enum SingletonBoundKind {
1699 Unbounded,
1700 Monotonic,
1701 Bounded,
1702}
1703
1704#[derive(Clone, PartialEq, Eq, Debug)]
1705pub enum CollectionKind {
1706 Stream {
1707 bound: BoundKind,
1708 order: StreamOrder,
1709 retry: StreamRetry,
1710 element_type: DebugType,
1711 },
1712 Singleton {
1713 bound: SingletonBoundKind,
1714 element_type: DebugType,
1715 },
1716 Optional {
1717 bound: BoundKind,
1718 element_type: DebugType,
1719 },
1720 KeyedStream {
1721 bound: BoundKind,
1722 value_order: StreamOrder,
1723 value_retry: StreamRetry,
1724 key_type: DebugType,
1725 value_type: DebugType,
1726 },
1727 KeyedSingleton {
1728 bound: KeyedSingletonBoundKind,
1729 key_type: DebugType,
1730 value_type: DebugType,
1731 },
1732}
1733
1734impl CollectionKind {
1735 pub fn is_bounded(&self) -> bool {
1736 matches!(
1737 self,
1738 CollectionKind::Stream {
1739 bound: BoundKind::Bounded,
1740 ..
1741 } | CollectionKind::Singleton {
1742 bound: SingletonBoundKind::Bounded,
1743 ..
1744 } | CollectionKind::Optional {
1745 bound: BoundKind::Bounded,
1746 ..
1747 } | CollectionKind::KeyedStream {
1748 bound: BoundKind::Bounded,
1749 ..
1750 } | CollectionKind::KeyedSingleton {
1751 bound: KeyedSingletonBoundKind::Bounded,
1752 ..
1753 }
1754 )
1755 }
1756}
1757
1758#[derive(Clone)]
1759pub struct HydroIrMetadata {
1760 pub location_id: LocationId,
1761 pub collection_kind: CollectionKind,
1762 pub cardinality: Option<usize>,
1763 pub tag: Option<String>,
1764 pub op: HydroIrOpMetadata,
1765}
1766
1767impl Hash for HydroIrMetadata {
1769 fn hash<H: Hasher>(&self, _: &mut H) {}
1770}
1771
1772impl PartialEq for HydroIrMetadata {
1773 fn eq(&self, _: &Self) -> bool {
1774 true
1775 }
1776}
1777
1778impl Eq for HydroIrMetadata {}
1779
1780impl Debug for HydroIrMetadata {
1781 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1782 f.debug_struct("HydroIrMetadata")
1783 .field("location_id", &self.location_id)
1784 .field("collection_kind", &self.collection_kind)
1785 .finish()
1786 }
1787}
1788
1789#[derive(Clone)]
1792pub struct HydroIrOpMetadata {
1793 pub backtrace: Backtrace,
1794 pub cpu_usage: Option<f64>,
1795 pub network_recv_cpu_usage: Option<f64>,
1796 pub id: Option<usize>,
1797}
1798
1799impl HydroIrOpMetadata {
1800 #[expect(
1801 clippy::new_without_default,
1802 reason = "explicit calls to new ensure correct backtrace bounds"
1803 )]
1804 pub fn new() -> HydroIrOpMetadata {
1805 Self::new_with_skip(1)
1806 }
1807
1808 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1809 HydroIrOpMetadata {
1810 backtrace: Backtrace::get_backtrace(2 + skip_count),
1811 cpu_usage: None,
1812 network_recv_cpu_usage: None,
1813 id: None,
1814 }
1815 }
1816}
1817
1818impl Debug for HydroIrOpMetadata {
1819 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1820 f.debug_struct("HydroIrOpMetadata").finish()
1821 }
1822}
1823
1824impl Hash for HydroIrOpMetadata {
1825 fn hash<H: Hasher>(&self, _: &mut H) {}
1826}
1827
1828#[derive(Debug, Hash)]
1831pub enum HydroNode {
1832 Placeholder,
1833
1834 Cast {
1842 inner: Box<HydroNode>,
1843 metadata: HydroIrMetadata,
1844 },
1845
1846 ObserveNonDet {
1852 inner: Box<HydroNode>,
1853 trusted: bool, metadata: HydroIrMetadata,
1855 },
1856
1857 Source {
1858 source: HydroSource,
1859 metadata: HydroIrMetadata,
1860 },
1861
1862 SingletonSource {
1863 value: DebugExpr,
1864 first_tick_only: bool,
1865 metadata: HydroIrMetadata,
1866 },
1867
1868 CycleSource {
1869 cycle_id: CycleId,
1870 metadata: HydroIrMetadata,
1871 },
1872
1873 Tee {
1874 inner: SharedNode,
1875 metadata: HydroIrMetadata,
1876 },
1877
1878 Partition {
1879 inner: SharedNode,
1880 f: DebugExpr,
1881 is_true: bool,
1882 metadata: HydroIrMetadata,
1883 },
1884
1885 BeginAtomic {
1886 inner: Box<HydroNode>,
1887 metadata: HydroIrMetadata,
1888 },
1889
1890 EndAtomic {
1891 inner: Box<HydroNode>,
1892 metadata: HydroIrMetadata,
1893 },
1894
1895 Batch {
1896 inner: Box<HydroNode>,
1897 metadata: HydroIrMetadata,
1898 },
1899
1900 YieldConcat {
1901 inner: Box<HydroNode>,
1902 metadata: HydroIrMetadata,
1903 },
1904
1905 Chain {
1906 first: Box<HydroNode>,
1907 second: Box<HydroNode>,
1908 metadata: HydroIrMetadata,
1909 },
1910
1911 ChainFirst {
1912 first: Box<HydroNode>,
1913 second: Box<HydroNode>,
1914 metadata: HydroIrMetadata,
1915 },
1916
1917 CrossProduct {
1918 left: Box<HydroNode>,
1919 right: Box<HydroNode>,
1920 metadata: HydroIrMetadata,
1921 },
1922
1923 CrossSingleton {
1924 left: Box<HydroNode>,
1925 right: Box<HydroNode>,
1926 metadata: HydroIrMetadata,
1927 },
1928
1929 Join {
1930 left: Box<HydroNode>,
1931 right: Box<HydroNode>,
1932 metadata: HydroIrMetadata,
1933 },
1934
1935 Difference {
1936 pos: Box<HydroNode>,
1937 neg: Box<HydroNode>,
1938 metadata: HydroIrMetadata,
1939 },
1940
1941 AntiJoin {
1942 pos: Box<HydroNode>,
1943 neg: Box<HydroNode>,
1944 metadata: HydroIrMetadata,
1945 },
1946
1947 ResolveFutures {
1948 input: Box<HydroNode>,
1949 metadata: HydroIrMetadata,
1950 },
1951 ResolveFuturesBlocking {
1952 input: Box<HydroNode>,
1953 metadata: HydroIrMetadata,
1954 },
1955 ResolveFuturesOrdered {
1956 input: Box<HydroNode>,
1957 metadata: HydroIrMetadata,
1958 },
1959
1960 Map {
1961 f: DebugExpr,
1962 input: Box<HydroNode>,
1963 metadata: HydroIrMetadata,
1964 },
1965 FlatMap {
1966 f: DebugExpr,
1967 input: Box<HydroNode>,
1968 metadata: HydroIrMetadata,
1969 },
1970 FlatMapStreamBlocking {
1971 f: DebugExpr,
1972 input: Box<HydroNode>,
1973 metadata: HydroIrMetadata,
1974 },
1975 Filter {
1976 f: DebugExpr,
1977 input: Box<HydroNode>,
1978 metadata: HydroIrMetadata,
1979 },
1980 FilterMap {
1981 f: DebugExpr,
1982 input: Box<HydroNode>,
1983 metadata: HydroIrMetadata,
1984 },
1985
1986 DeferTick {
1987 input: Box<HydroNode>,
1988 metadata: HydroIrMetadata,
1989 },
1990 Enumerate {
1991 input: Box<HydroNode>,
1992 metadata: HydroIrMetadata,
1993 },
1994 Inspect {
1995 f: DebugExpr,
1996 input: Box<HydroNode>,
1997 metadata: HydroIrMetadata,
1998 },
1999
2000 Unique {
2001 input: Box<HydroNode>,
2002 metadata: HydroIrMetadata,
2003 },
2004
2005 Sort {
2006 input: Box<HydroNode>,
2007 metadata: HydroIrMetadata,
2008 },
2009 Fold {
2010 init: DebugExpr,
2011 acc: DebugExpr,
2012 input: Box<HydroNode>,
2013 metadata: HydroIrMetadata,
2014 },
2015
2016 Scan {
2017 init: DebugExpr,
2018 acc: DebugExpr,
2019 input: Box<HydroNode>,
2020 metadata: HydroIrMetadata,
2021 },
2022 ScanAsyncBlocking {
2023 init: DebugExpr,
2024 acc: DebugExpr,
2025 input: Box<HydroNode>,
2026 metadata: HydroIrMetadata,
2027 },
2028 FoldKeyed {
2029 init: DebugExpr,
2030 acc: DebugExpr,
2031 input: Box<HydroNode>,
2032 metadata: HydroIrMetadata,
2033 },
2034
2035 Reduce {
2036 f: DebugExpr,
2037 input: Box<HydroNode>,
2038 metadata: HydroIrMetadata,
2039 },
2040 ReduceKeyed {
2041 f: DebugExpr,
2042 input: Box<HydroNode>,
2043 metadata: HydroIrMetadata,
2044 },
2045 ReduceKeyedWatermark {
2046 f: DebugExpr,
2047 input: Box<HydroNode>,
2048 watermark: Box<HydroNode>,
2049 metadata: HydroIrMetadata,
2050 },
2051
2052 Network {
2053 name: Option<String>,
2054 networking_info: crate::networking::NetworkingInfo,
2055 serialize_fn: Option<DebugExpr>,
2056 instantiate_fn: DebugInstantiate,
2057 deserialize_fn: Option<DebugExpr>,
2058 input: Box<HydroNode>,
2059 metadata: HydroIrMetadata,
2060 },
2061
2062 ExternalInput {
2063 from_external_key: LocationKey,
2064 from_port_id: ExternalPortId,
2065 from_many: bool,
2066 codec_type: DebugType,
2067 port_hint: NetworkHint,
2068 instantiate_fn: DebugInstantiate,
2069 deserialize_fn: Option<DebugExpr>,
2070 metadata: HydroIrMetadata,
2071 },
2072
2073 Counter {
2074 tag: String,
2075 duration: DebugExpr,
2076 prefix: String,
2077 input: Box<HydroNode>,
2078 metadata: HydroIrMetadata,
2079 },
2080}
2081
2082pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2083pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2084
2085impl HydroNode {
2086 pub fn transform_bottom_up(
2087 &mut self,
2088 transform: &mut impl FnMut(&mut HydroNode),
2089 seen_tees: &mut SeenSharedNodes,
2090 check_well_formed: bool,
2091 ) {
2092 self.transform_children(
2093 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2094 seen_tees,
2095 );
2096
2097 transform(self);
2098
2099 let self_location = self.metadata().location_id.root();
2100
2101 if check_well_formed {
2102 match &*self {
2103 HydroNode::Network { .. } => {}
2104 _ => {
2105 self.input_metadata().iter().for_each(|i| {
2106 if i.location_id.root() != self_location {
2107 panic!(
2108 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2109 i,
2110 i.location_id.root(),
2111 self,
2112 self_location
2113 )
2114 }
2115 });
2116 }
2117 }
2118 }
2119 }
2120
2121 #[inline(always)]
2122 pub fn transform_children(
2123 &mut self,
2124 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2125 seen_tees: &mut SeenSharedNodes,
2126 ) {
2127 match self {
2128 HydroNode::Placeholder => {
2129 panic!();
2130 }
2131
2132 HydroNode::Source { .. }
2133 | HydroNode::SingletonSource { .. }
2134 | HydroNode::CycleSource { .. }
2135 | HydroNode::ExternalInput { .. } => {}
2136
2137 HydroNode::Tee { inner, .. } => {
2138 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2139 *inner = SharedNode(transformed.clone());
2140 } else {
2141 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2142 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2143 let mut orig = inner.0.replace(HydroNode::Placeholder);
2144 transform(&mut orig, seen_tees);
2145 *transformed_cell.borrow_mut() = orig;
2146 *inner = SharedNode(transformed_cell);
2147 }
2148 }
2149
2150 HydroNode::Partition { inner, .. } => {
2151 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2152 *inner = SharedNode(transformed.clone());
2153 } else {
2154 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2155 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2156 let mut orig = inner.0.replace(HydroNode::Placeholder);
2157 transform(&mut orig, seen_tees);
2158 *transformed_cell.borrow_mut() = orig;
2159 *inner = SharedNode(transformed_cell);
2160 }
2161 }
2162
2163 HydroNode::Cast { inner, .. }
2164 | HydroNode::ObserveNonDet { inner, .. }
2165 | HydroNode::BeginAtomic { inner, .. }
2166 | HydroNode::EndAtomic { inner, .. }
2167 | HydroNode::Batch { inner, .. }
2168 | HydroNode::YieldConcat { inner, .. } => {
2169 transform(inner.as_mut(), seen_tees);
2170 }
2171
2172 HydroNode::Chain { first, second, .. } => {
2173 transform(first.as_mut(), seen_tees);
2174 transform(second.as_mut(), seen_tees);
2175 }
2176
2177 HydroNode::ChainFirst { first, second, .. } => {
2178 transform(first.as_mut(), seen_tees);
2179 transform(second.as_mut(), seen_tees);
2180 }
2181
2182 HydroNode::CrossSingleton { left, right, .. }
2183 | HydroNode::CrossProduct { left, right, .. }
2184 | HydroNode::Join { left, right, .. } => {
2185 transform(left.as_mut(), seen_tees);
2186 transform(right.as_mut(), seen_tees);
2187 }
2188
2189 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2190 transform(pos.as_mut(), seen_tees);
2191 transform(neg.as_mut(), seen_tees);
2192 }
2193
2194 HydroNode::ReduceKeyedWatermark {
2195 input, watermark, ..
2196 } => {
2197 transform(input.as_mut(), seen_tees);
2198 transform(watermark.as_mut(), seen_tees);
2199 }
2200
2201 HydroNode::Map { input, .. }
2202 | HydroNode::ResolveFutures { input, .. }
2203 | HydroNode::ResolveFuturesBlocking { input, .. }
2204 | HydroNode::ResolveFuturesOrdered { input, .. }
2205 | HydroNode::FlatMap { input, .. }
2206 | HydroNode::FlatMapStreamBlocking { input, .. }
2207 | HydroNode::Filter { input, .. }
2208 | HydroNode::FilterMap { input, .. }
2209 | HydroNode::Sort { input, .. }
2210 | HydroNode::DeferTick { input, .. }
2211 | HydroNode::Enumerate { input, .. }
2212 | HydroNode::Inspect { input, .. }
2213 | HydroNode::Unique { input, .. }
2214 | HydroNode::Network { input, .. }
2215 | HydroNode::Fold { input, .. }
2216 | HydroNode::Scan { input, .. }
2217 | HydroNode::ScanAsyncBlocking { input, .. }
2218 | HydroNode::FoldKeyed { input, .. }
2219 | HydroNode::Reduce { input, .. }
2220 | HydroNode::ReduceKeyed { input, .. }
2221 | HydroNode::Counter { input, .. } => {
2222 transform(input.as_mut(), seen_tees);
2223 }
2224 }
2225 }
2226
2227 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2228 match self {
2229 HydroNode::Placeholder => HydroNode::Placeholder,
2230 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2231 inner: Box::new(inner.deep_clone(seen_tees)),
2232 metadata: metadata.clone(),
2233 },
2234 HydroNode::ObserveNonDet {
2235 inner,
2236 trusted,
2237 metadata,
2238 } => HydroNode::ObserveNonDet {
2239 inner: Box::new(inner.deep_clone(seen_tees)),
2240 trusted: *trusted,
2241 metadata: metadata.clone(),
2242 },
2243 HydroNode::Source { source, metadata } => HydroNode::Source {
2244 source: source.clone(),
2245 metadata: metadata.clone(),
2246 },
2247 HydroNode::SingletonSource {
2248 value,
2249 first_tick_only,
2250 metadata,
2251 } => HydroNode::SingletonSource {
2252 value: value.clone(),
2253 first_tick_only: *first_tick_only,
2254 metadata: metadata.clone(),
2255 },
2256 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2257 cycle_id: *cycle_id,
2258 metadata: metadata.clone(),
2259 },
2260 HydroNode::Tee { inner, metadata } => {
2261 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2262 HydroNode::Tee {
2263 inner: SharedNode(transformed.clone()),
2264 metadata: metadata.clone(),
2265 }
2266 } else {
2267 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2268 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2269 let cloned = inner.0.borrow().deep_clone(seen_tees);
2270 *new_rc.borrow_mut() = cloned;
2271 HydroNode::Tee {
2272 inner: SharedNode(new_rc),
2273 metadata: metadata.clone(),
2274 }
2275 }
2276 }
2277 HydroNode::Partition {
2278 inner,
2279 f,
2280 is_true,
2281 metadata,
2282 } => {
2283 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2284 HydroNode::Partition {
2285 inner: SharedNode(transformed.clone()),
2286 f: f.clone(),
2287 is_true: *is_true,
2288 metadata: metadata.clone(),
2289 }
2290 } else {
2291 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2292 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2293 let cloned = inner.0.borrow().deep_clone(seen_tees);
2294 *new_rc.borrow_mut() = cloned;
2295 HydroNode::Partition {
2296 inner: SharedNode(new_rc),
2297 f: f.clone(),
2298 is_true: *is_true,
2299 metadata: metadata.clone(),
2300 }
2301 }
2302 }
2303 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2304 inner: Box::new(inner.deep_clone(seen_tees)),
2305 metadata: metadata.clone(),
2306 },
2307 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2308 inner: Box::new(inner.deep_clone(seen_tees)),
2309 metadata: metadata.clone(),
2310 },
2311 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2312 inner: Box::new(inner.deep_clone(seen_tees)),
2313 metadata: metadata.clone(),
2314 },
2315 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2316 inner: Box::new(inner.deep_clone(seen_tees)),
2317 metadata: metadata.clone(),
2318 },
2319 HydroNode::Chain {
2320 first,
2321 second,
2322 metadata,
2323 } => HydroNode::Chain {
2324 first: Box::new(first.deep_clone(seen_tees)),
2325 second: Box::new(second.deep_clone(seen_tees)),
2326 metadata: metadata.clone(),
2327 },
2328 HydroNode::ChainFirst {
2329 first,
2330 second,
2331 metadata,
2332 } => HydroNode::ChainFirst {
2333 first: Box::new(first.deep_clone(seen_tees)),
2334 second: Box::new(second.deep_clone(seen_tees)),
2335 metadata: metadata.clone(),
2336 },
2337 HydroNode::CrossProduct {
2338 left,
2339 right,
2340 metadata,
2341 } => HydroNode::CrossProduct {
2342 left: Box::new(left.deep_clone(seen_tees)),
2343 right: Box::new(right.deep_clone(seen_tees)),
2344 metadata: metadata.clone(),
2345 },
2346 HydroNode::CrossSingleton {
2347 left,
2348 right,
2349 metadata,
2350 } => HydroNode::CrossSingleton {
2351 left: Box::new(left.deep_clone(seen_tees)),
2352 right: Box::new(right.deep_clone(seen_tees)),
2353 metadata: metadata.clone(),
2354 },
2355 HydroNode::Join {
2356 left,
2357 right,
2358 metadata,
2359 } => HydroNode::Join {
2360 left: Box::new(left.deep_clone(seen_tees)),
2361 right: Box::new(right.deep_clone(seen_tees)),
2362 metadata: metadata.clone(),
2363 },
2364 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2365 pos: Box::new(pos.deep_clone(seen_tees)),
2366 neg: Box::new(neg.deep_clone(seen_tees)),
2367 metadata: metadata.clone(),
2368 },
2369 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2370 pos: Box::new(pos.deep_clone(seen_tees)),
2371 neg: Box::new(neg.deep_clone(seen_tees)),
2372 metadata: metadata.clone(),
2373 },
2374 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2375 input: Box::new(input.deep_clone(seen_tees)),
2376 metadata: metadata.clone(),
2377 },
2378 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2379 HydroNode::ResolveFuturesBlocking {
2380 input: Box::new(input.deep_clone(seen_tees)),
2381 metadata: metadata.clone(),
2382 }
2383 }
2384 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2385 HydroNode::ResolveFuturesOrdered {
2386 input: Box::new(input.deep_clone(seen_tees)),
2387 metadata: metadata.clone(),
2388 }
2389 }
2390 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2391 f: f.clone(),
2392 input: Box::new(input.deep_clone(seen_tees)),
2393 metadata: metadata.clone(),
2394 },
2395 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2396 f: f.clone(),
2397 input: Box::new(input.deep_clone(seen_tees)),
2398 metadata: metadata.clone(),
2399 },
2400 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2401 HydroNode::FlatMapStreamBlocking {
2402 f: f.clone(),
2403 input: Box::new(input.deep_clone(seen_tees)),
2404 metadata: metadata.clone(),
2405 }
2406 }
2407 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2408 f: f.clone(),
2409 input: Box::new(input.deep_clone(seen_tees)),
2410 metadata: metadata.clone(),
2411 },
2412 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2413 f: f.clone(),
2414 input: Box::new(input.deep_clone(seen_tees)),
2415 metadata: metadata.clone(),
2416 },
2417 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2418 input: Box::new(input.deep_clone(seen_tees)),
2419 metadata: metadata.clone(),
2420 },
2421 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2422 input: Box::new(input.deep_clone(seen_tees)),
2423 metadata: metadata.clone(),
2424 },
2425 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2426 f: f.clone(),
2427 input: Box::new(input.deep_clone(seen_tees)),
2428 metadata: metadata.clone(),
2429 },
2430 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2431 input: Box::new(input.deep_clone(seen_tees)),
2432 metadata: metadata.clone(),
2433 },
2434 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2435 input: Box::new(input.deep_clone(seen_tees)),
2436 metadata: metadata.clone(),
2437 },
2438 HydroNode::Fold {
2439 init,
2440 acc,
2441 input,
2442 metadata,
2443 } => HydroNode::Fold {
2444 init: init.clone(),
2445 acc: acc.clone(),
2446 input: Box::new(input.deep_clone(seen_tees)),
2447 metadata: metadata.clone(),
2448 },
2449 HydroNode::Scan {
2450 init,
2451 acc,
2452 input,
2453 metadata,
2454 } => HydroNode::Scan {
2455 init: init.clone(),
2456 acc: acc.clone(),
2457 input: Box::new(input.deep_clone(seen_tees)),
2458 metadata: metadata.clone(),
2459 },
2460 HydroNode::ScanAsyncBlocking {
2461 init,
2462 acc,
2463 input,
2464 metadata,
2465 } => HydroNode::ScanAsyncBlocking {
2466 init: init.clone(),
2467 acc: acc.clone(),
2468 input: Box::new(input.deep_clone(seen_tees)),
2469 metadata: metadata.clone(),
2470 },
2471 HydroNode::FoldKeyed {
2472 init,
2473 acc,
2474 input,
2475 metadata,
2476 } => HydroNode::FoldKeyed {
2477 init: init.clone(),
2478 acc: acc.clone(),
2479 input: Box::new(input.deep_clone(seen_tees)),
2480 metadata: metadata.clone(),
2481 },
2482 HydroNode::ReduceKeyedWatermark {
2483 f,
2484 input,
2485 watermark,
2486 metadata,
2487 } => HydroNode::ReduceKeyedWatermark {
2488 f: f.clone(),
2489 input: Box::new(input.deep_clone(seen_tees)),
2490 watermark: Box::new(watermark.deep_clone(seen_tees)),
2491 metadata: metadata.clone(),
2492 },
2493 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2494 f: f.clone(),
2495 input: Box::new(input.deep_clone(seen_tees)),
2496 metadata: metadata.clone(),
2497 },
2498 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2499 f: f.clone(),
2500 input: Box::new(input.deep_clone(seen_tees)),
2501 metadata: metadata.clone(),
2502 },
2503 HydroNode::Network {
2504 name,
2505 networking_info,
2506 serialize_fn,
2507 instantiate_fn,
2508 deserialize_fn,
2509 input,
2510 metadata,
2511 } => HydroNode::Network {
2512 name: name.clone(),
2513 networking_info: networking_info.clone(),
2514 serialize_fn: serialize_fn.clone(),
2515 instantiate_fn: instantiate_fn.clone(),
2516 deserialize_fn: deserialize_fn.clone(),
2517 input: Box::new(input.deep_clone(seen_tees)),
2518 metadata: metadata.clone(),
2519 },
2520 HydroNode::ExternalInput {
2521 from_external_key,
2522 from_port_id,
2523 from_many,
2524 codec_type,
2525 port_hint,
2526 instantiate_fn,
2527 deserialize_fn,
2528 metadata,
2529 } => HydroNode::ExternalInput {
2530 from_external_key: *from_external_key,
2531 from_port_id: *from_port_id,
2532 from_many: *from_many,
2533 codec_type: codec_type.clone(),
2534 port_hint: *port_hint,
2535 instantiate_fn: instantiate_fn.clone(),
2536 deserialize_fn: deserialize_fn.clone(),
2537 metadata: metadata.clone(),
2538 },
2539 HydroNode::Counter {
2540 tag,
2541 duration,
2542 prefix,
2543 input,
2544 metadata,
2545 } => HydroNode::Counter {
2546 tag: tag.clone(),
2547 duration: duration.clone(),
2548 prefix: prefix.clone(),
2549 input: Box::new(input.deep_clone(seen_tees)),
2550 metadata: metadata.clone(),
2551 },
2552 }
2553 }
2554
2555 #[cfg(feature = "build")]
2556 pub fn emit_core(
2557 &mut self,
2558 builders_or_callback: &mut BuildersOrCallback<
2559 impl FnMut(&mut HydroRoot, &mut usize),
2560 impl FnMut(&mut HydroNode, &mut usize),
2561 >,
2562 seen_tees: &mut SeenSharedNodes,
2563 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2564 next_stmt_id: &mut usize,
2565 ) -> syn::Ident {
2566 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2567
2568 self.transform_bottom_up(
2569 &mut |node: &mut HydroNode| {
2570 let out_location = node.metadata().location_id.clone();
2571 match node {
2572 HydroNode::Placeholder => {
2573 panic!()
2574 }
2575
2576 HydroNode::Cast { .. } => {
2577 match builders_or_callback {
2580 BuildersOrCallback::Builders(_) => {}
2581 BuildersOrCallback::Callback(_, node_callback) => {
2582 node_callback(node, next_stmt_id);
2583 }
2584 }
2585
2586 *next_stmt_id += 1;
2587 }
2589
2590 HydroNode::ObserveNonDet {
2591 inner,
2592 trusted,
2593 metadata,
2594 ..
2595 } => {
2596 let inner_ident = ident_stack.pop().unwrap();
2597
2598 let observe_ident =
2599 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2600
2601 match builders_or_callback {
2602 BuildersOrCallback::Builders(graph_builders) => {
2603 graph_builders.observe_nondet(
2604 *trusted,
2605 &inner.metadata().location_id,
2606 inner_ident,
2607 &inner.metadata().collection_kind,
2608 &observe_ident,
2609 &metadata.collection_kind,
2610 &metadata.op,
2611 );
2612 }
2613 BuildersOrCallback::Callback(_, node_callback) => {
2614 node_callback(node, next_stmt_id);
2615 }
2616 }
2617
2618 *next_stmt_id += 1;
2619
2620 ident_stack.push(observe_ident);
2621 }
2622
2623 HydroNode::Batch {
2624 inner, metadata, ..
2625 } => {
2626 let inner_ident = ident_stack.pop().unwrap();
2627
2628 let batch_ident =
2629 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2630
2631 match builders_or_callback {
2632 BuildersOrCallback::Builders(graph_builders) => {
2633 graph_builders.batch(
2634 inner_ident,
2635 &inner.metadata().location_id,
2636 &inner.metadata().collection_kind,
2637 &batch_ident,
2638 &out_location,
2639 &metadata.op,
2640 );
2641 }
2642 BuildersOrCallback::Callback(_, node_callback) => {
2643 node_callback(node, next_stmt_id);
2644 }
2645 }
2646
2647 *next_stmt_id += 1;
2648
2649 ident_stack.push(batch_ident);
2650 }
2651
2652 HydroNode::YieldConcat { inner, .. } => {
2653 let inner_ident = ident_stack.pop().unwrap();
2654
2655 let yield_ident =
2656 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2657
2658 match builders_or_callback {
2659 BuildersOrCallback::Builders(graph_builders) => {
2660 graph_builders.yield_from_tick(
2661 inner_ident,
2662 &inner.metadata().location_id,
2663 &inner.metadata().collection_kind,
2664 &yield_ident,
2665 &out_location,
2666 );
2667 }
2668 BuildersOrCallback::Callback(_, node_callback) => {
2669 node_callback(node, next_stmt_id);
2670 }
2671 }
2672
2673 *next_stmt_id += 1;
2674
2675 ident_stack.push(yield_ident);
2676 }
2677
2678 HydroNode::BeginAtomic { inner, metadata } => {
2679 let inner_ident = ident_stack.pop().unwrap();
2680
2681 let begin_ident =
2682 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2683
2684 match builders_or_callback {
2685 BuildersOrCallback::Builders(graph_builders) => {
2686 graph_builders.begin_atomic(
2687 inner_ident,
2688 &inner.metadata().location_id,
2689 &inner.metadata().collection_kind,
2690 &begin_ident,
2691 &out_location,
2692 &metadata.op,
2693 );
2694 }
2695 BuildersOrCallback::Callback(_, node_callback) => {
2696 node_callback(node, next_stmt_id);
2697 }
2698 }
2699
2700 *next_stmt_id += 1;
2701
2702 ident_stack.push(begin_ident);
2703 }
2704
2705 HydroNode::EndAtomic { inner, .. } => {
2706 let inner_ident = ident_stack.pop().unwrap();
2707
2708 let end_ident =
2709 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2710
2711 match builders_or_callback {
2712 BuildersOrCallback::Builders(graph_builders) => {
2713 graph_builders.end_atomic(
2714 inner_ident,
2715 &inner.metadata().location_id,
2716 &inner.metadata().collection_kind,
2717 &end_ident,
2718 );
2719 }
2720 BuildersOrCallback::Callback(_, node_callback) => {
2721 node_callback(node, next_stmt_id);
2722 }
2723 }
2724
2725 *next_stmt_id += 1;
2726
2727 ident_stack.push(end_ident);
2728 }
2729
2730 HydroNode::Source {
2731 source, metadata, ..
2732 } => {
2733 if let HydroSource::ExternalNetwork() = source {
2734 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2735 } else {
2736 let source_ident =
2737 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2738
2739 let source_stmt = match source {
2740 HydroSource::Stream(expr) => {
2741 debug_assert!(metadata.location_id.is_top_level());
2742 parse_quote! {
2743 #source_ident = source_stream(#expr);
2744 }
2745 }
2746
2747 HydroSource::ExternalNetwork() => {
2748 unreachable!()
2749 }
2750
2751 HydroSource::Iter(expr) => {
2752 if metadata.location_id.is_top_level() {
2753 parse_quote! {
2754 #source_ident = source_iter(#expr);
2755 }
2756 } else {
2757 parse_quote! {
2759 #source_ident = source_iter(#expr) -> persist::<'static>();
2760 }
2761 }
2762 }
2763
2764 HydroSource::Spin() => {
2765 debug_assert!(metadata.location_id.is_top_level());
2766 parse_quote! {
2767 #source_ident = spin();
2768 }
2769 }
2770
2771 HydroSource::ClusterMembers(target_loc, state) => {
2772 debug_assert!(metadata.location_id.is_top_level());
2773
2774 let members_tee_ident = syn::Ident::new(
2775 &format!(
2776 "__cluster_members_tee_{}_{}",
2777 metadata.location_id.root().key(),
2778 target_loc.key(),
2779 ),
2780 Span::call_site(),
2781 );
2782
2783 match state {
2784 ClusterMembersState::Stream(d) => {
2785 parse_quote! {
2786 #members_tee_ident = source_stream(#d) -> tee();
2787 #source_ident = #members_tee_ident;
2788 }
2789 },
2790 ClusterMembersState::Uninit => syn::parse_quote! {
2791 #source_ident = source_stream(DUMMY);
2792 },
2793 ClusterMembersState::Tee(..) => parse_quote! {
2794 #source_ident = #members_tee_ident;
2795 },
2796 }
2797 }
2798
2799 HydroSource::Embedded(ident) => {
2800 parse_quote! {
2801 #source_ident = source_stream(#ident);
2802 }
2803 }
2804
2805 HydroSource::EmbeddedSingleton(ident) => {
2806 parse_quote! {
2807 #source_ident = source_iter([#ident]);
2808 }
2809 }
2810 };
2811
2812 match builders_or_callback {
2813 BuildersOrCallback::Builders(graph_builders) => {
2814 let builder = graph_builders.get_dfir_mut(&out_location);
2815 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2816 }
2817 BuildersOrCallback::Callback(_, node_callback) => {
2818 node_callback(node, next_stmt_id);
2819 }
2820 }
2821
2822 *next_stmt_id += 1;
2823
2824 ident_stack.push(source_ident);
2825 }
2826 }
2827
2828 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2829 let source_ident =
2830 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2831
2832 match builders_or_callback {
2833 BuildersOrCallback::Builders(graph_builders) => {
2834 let builder = graph_builders.get_dfir_mut(&out_location);
2835
2836 if *first_tick_only {
2837 assert!(
2838 !metadata.location_id.is_top_level(),
2839 "first_tick_only SingletonSource must be inside a tick"
2840 );
2841 }
2842
2843 if *first_tick_only
2844 || (metadata.location_id.is_top_level()
2845 && metadata.collection_kind.is_bounded())
2846 {
2847 builder.add_dfir(
2848 parse_quote! {
2849 #source_ident = source_iter([#value]);
2850 },
2851 None,
2852 Some(&next_stmt_id.to_string()),
2853 );
2854 } else {
2855 builder.add_dfir(
2856 parse_quote! {
2857 #source_ident = source_iter([#value]) -> persist::<'static>();
2858 },
2859 None,
2860 Some(&next_stmt_id.to_string()),
2861 );
2862 }
2863 }
2864 BuildersOrCallback::Callback(_, node_callback) => {
2865 node_callback(node, next_stmt_id);
2866 }
2867 }
2868
2869 *next_stmt_id += 1;
2870
2871 ident_stack.push(source_ident);
2872 }
2873
2874 HydroNode::CycleSource { cycle_id, .. } => {
2875 let ident = cycle_id.as_ident();
2876
2877 match builders_or_callback {
2878 BuildersOrCallback::Builders(_) => {}
2879 BuildersOrCallback::Callback(_, node_callback) => {
2880 node_callback(node, next_stmt_id);
2881 }
2882 }
2883
2884 *next_stmt_id += 1;
2886
2887 ident_stack.push(ident);
2888 }
2889
2890 HydroNode::Tee { inner, .. } => {
2891 let ret_ident = if let Some(built_idents) =
2892 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2893 {
2894 match builders_or_callback {
2895 BuildersOrCallback::Builders(_) => {}
2896 BuildersOrCallback::Callback(_, node_callback) => {
2897 node_callback(node, next_stmt_id);
2898 }
2899 }
2900
2901 built_idents[0].clone()
2902 } else {
2903 let inner_ident = ident_stack.pop().unwrap();
2906
2907 let tee_ident =
2908 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2909
2910 built_tees.insert(
2911 inner.0.as_ref() as *const RefCell<HydroNode>,
2912 vec![tee_ident.clone()],
2913 );
2914
2915 match builders_or_callback {
2916 BuildersOrCallback::Builders(graph_builders) => {
2917 let builder = graph_builders.get_dfir_mut(&out_location);
2918 builder.add_dfir(
2919 parse_quote! {
2920 #tee_ident = #inner_ident -> tee();
2921 },
2922 None,
2923 Some(&next_stmt_id.to_string()),
2924 );
2925 }
2926 BuildersOrCallback::Callback(_, node_callback) => {
2927 node_callback(node, next_stmt_id);
2928 }
2929 }
2930
2931 tee_ident
2932 };
2933
2934 *next_stmt_id += 1;
2938 ident_stack.push(ret_ident);
2939 }
2940
2941 HydroNode::Partition {
2942 inner, f, is_true, ..
2943 } => {
2944 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2946 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2947 match builders_or_callback {
2948 BuildersOrCallback::Builders(_) => {}
2949 BuildersOrCallback::Callback(_, node_callback) => {
2950 node_callback(node, next_stmt_id);
2951 }
2952 }
2953
2954 let idx = if is_true { 0 } else { 1 };
2955 built_idents[idx].clone()
2956 } else {
2957 let inner_ident = ident_stack.pop().unwrap();
2960
2961 let partition_ident = syn::Ident::new(
2962 &format!("stream_{}_partition", *next_stmt_id),
2963 Span::call_site(),
2964 );
2965 let true_ident = syn::Ident::new(
2966 &format!("stream_{}_true", *next_stmt_id),
2967 Span::call_site(),
2968 );
2969 let false_ident = syn::Ident::new(
2970 &format!("stream_{}_false", *next_stmt_id),
2971 Span::call_site(),
2972 );
2973
2974 built_tees.insert(
2975 ptr,
2976 vec![true_ident.clone(), false_ident.clone()],
2977 );
2978
2979 match builders_or_callback {
2980 BuildersOrCallback::Builders(graph_builders) => {
2981 let builder = graph_builders.get_dfir_mut(&out_location);
2982 builder.add_dfir(
2983 parse_quote! {
2984 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2985 #true_ident = #partition_ident[0];
2986 #false_ident = #partition_ident[1];
2987 },
2988 None,
2989 Some(&next_stmt_id.to_string()),
2990 );
2991 }
2992 BuildersOrCallback::Callback(_, node_callback) => {
2993 node_callback(node, next_stmt_id);
2994 }
2995 }
2996
2997 if is_true { true_ident } else { false_ident }
2998 };
2999
3000 *next_stmt_id += 1;
3001 ident_stack.push(ret_ident);
3002 }
3003
3004 HydroNode::Chain { .. } => {
3005 let second_ident = ident_stack.pop().unwrap();
3007 let first_ident = ident_stack.pop().unwrap();
3008
3009 let chain_ident =
3010 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3011
3012 match builders_or_callback {
3013 BuildersOrCallback::Builders(graph_builders) => {
3014 let builder = graph_builders.get_dfir_mut(&out_location);
3015 builder.add_dfir(
3016 parse_quote! {
3017 #chain_ident = chain();
3018 #first_ident -> [0]#chain_ident;
3019 #second_ident -> [1]#chain_ident;
3020 },
3021 None,
3022 Some(&next_stmt_id.to_string()),
3023 );
3024 }
3025 BuildersOrCallback::Callback(_, node_callback) => {
3026 node_callback(node, next_stmt_id);
3027 }
3028 }
3029
3030 *next_stmt_id += 1;
3031
3032 ident_stack.push(chain_ident);
3033 }
3034
3035 HydroNode::ChainFirst { .. } => {
3036 let second_ident = ident_stack.pop().unwrap();
3037 let first_ident = ident_stack.pop().unwrap();
3038
3039 let chain_ident =
3040 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3041
3042 match builders_or_callback {
3043 BuildersOrCallback::Builders(graph_builders) => {
3044 let builder = graph_builders.get_dfir_mut(&out_location);
3045 builder.add_dfir(
3046 parse_quote! {
3047 #chain_ident = chain_first_n(1);
3048 #first_ident -> [0]#chain_ident;
3049 #second_ident -> [1]#chain_ident;
3050 },
3051 None,
3052 Some(&next_stmt_id.to_string()),
3053 );
3054 }
3055 BuildersOrCallback::Callback(_, node_callback) => {
3056 node_callback(node, next_stmt_id);
3057 }
3058 }
3059
3060 *next_stmt_id += 1;
3061
3062 ident_stack.push(chain_ident);
3063 }
3064
3065 HydroNode::CrossSingleton { right, .. } => {
3066 let right_ident = ident_stack.pop().unwrap();
3067 let left_ident = ident_stack.pop().unwrap();
3068
3069 let cross_ident =
3070 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3071
3072 match builders_or_callback {
3073 BuildersOrCallback::Builders(graph_builders) => {
3074 let builder = graph_builders.get_dfir_mut(&out_location);
3075
3076 if right.metadata().location_id.is_top_level()
3077 && right.metadata().collection_kind.is_bounded()
3078 {
3079 builder.add_dfir(
3080 parse_quote! {
3081 #cross_ident = cross_singleton();
3082 #left_ident -> [input]#cross_ident;
3083 #right_ident -> persist::<'static>() -> [single]#cross_ident;
3084 },
3085 None,
3086 Some(&next_stmt_id.to_string()),
3087 );
3088 } else {
3089 builder.add_dfir(
3090 parse_quote! {
3091 #cross_ident = cross_singleton();
3092 #left_ident -> [input]#cross_ident;
3093 #right_ident -> [single]#cross_ident;
3094 },
3095 None,
3096 Some(&next_stmt_id.to_string()),
3097 );
3098 }
3099 }
3100 BuildersOrCallback::Callback(_, node_callback) => {
3101 node_callback(node, next_stmt_id);
3102 }
3103 }
3104
3105 *next_stmt_id += 1;
3106
3107 ident_stack.push(cross_ident);
3108 }
3109
3110 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3111 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3112 parse_quote!(cross_join_multiset)
3113 } else {
3114 parse_quote!(join_multiset)
3115 };
3116
3117 let (HydroNode::CrossProduct { left, right, .. }
3118 | HydroNode::Join { left, right, .. }) = node
3119 else {
3120 unreachable!()
3121 };
3122
3123 let is_top_level = left.metadata().location_id.is_top_level()
3124 && right.metadata().location_id.is_top_level();
3125 let left_lifetime = if left.metadata().location_id.is_top_level() {
3126 quote!('static)
3127 } else {
3128 quote!('tick)
3129 };
3130
3131 let right_lifetime = if right.metadata().location_id.is_top_level() {
3132 quote!('static)
3133 } else {
3134 quote!('tick)
3135 };
3136
3137 let right_ident = ident_stack.pop().unwrap();
3138 let left_ident = ident_stack.pop().unwrap();
3139
3140 let stream_ident =
3141 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3142
3143 match builders_or_callback {
3144 BuildersOrCallback::Builders(graph_builders) => {
3145 let builder = graph_builders.get_dfir_mut(&out_location);
3146 builder.add_dfir(
3147 if is_top_level {
3148 parse_quote! {
3151 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3152 #left_ident -> [0]#stream_ident;
3153 #right_ident -> [1]#stream_ident;
3154 }
3155 } else {
3156 parse_quote! {
3157 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3158 #left_ident -> [0]#stream_ident;
3159 #right_ident -> [1]#stream_ident;
3160 }
3161 }
3162 ,
3163 None,
3164 Some(&next_stmt_id.to_string()),
3165 );
3166 }
3167 BuildersOrCallback::Callback(_, node_callback) => {
3168 node_callback(node, next_stmt_id);
3169 }
3170 }
3171
3172 *next_stmt_id += 1;
3173
3174 ident_stack.push(stream_ident);
3175 }
3176
3177 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3178 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3179 parse_quote!(difference)
3180 } else {
3181 parse_quote!(anti_join)
3182 };
3183
3184 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3185 node
3186 else {
3187 unreachable!()
3188 };
3189
3190 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3191 quote!('static)
3192 } else {
3193 quote!('tick)
3194 };
3195
3196 let neg_ident = ident_stack.pop().unwrap();
3197 let pos_ident = ident_stack.pop().unwrap();
3198
3199 let stream_ident =
3200 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3201
3202 match builders_or_callback {
3203 BuildersOrCallback::Builders(graph_builders) => {
3204 let builder = graph_builders.get_dfir_mut(&out_location);
3205 builder.add_dfir(
3206 parse_quote! {
3207 #stream_ident = #operator::<'tick, #neg_lifetime>();
3208 #pos_ident -> [pos]#stream_ident;
3209 #neg_ident -> [neg]#stream_ident;
3210 },
3211 None,
3212 Some(&next_stmt_id.to_string()),
3213 );
3214 }
3215 BuildersOrCallback::Callback(_, node_callback) => {
3216 node_callback(node, next_stmt_id);
3217 }
3218 }
3219
3220 *next_stmt_id += 1;
3221
3222 ident_stack.push(stream_ident);
3223 }
3224
3225 HydroNode::ResolveFutures { .. } => {
3226 let input_ident = ident_stack.pop().unwrap();
3227
3228 let futures_ident =
3229 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3230
3231 match builders_or_callback {
3232 BuildersOrCallback::Builders(graph_builders) => {
3233 let builder = graph_builders.get_dfir_mut(&out_location);
3234 builder.add_dfir(
3235 parse_quote! {
3236 #futures_ident = #input_ident -> resolve_futures();
3237 },
3238 None,
3239 Some(&next_stmt_id.to_string()),
3240 );
3241 }
3242 BuildersOrCallback::Callback(_, node_callback) => {
3243 node_callback(node, next_stmt_id);
3244 }
3245 }
3246
3247 *next_stmt_id += 1;
3248
3249 ident_stack.push(futures_ident);
3250 }
3251
3252 HydroNode::ResolveFuturesBlocking { .. } => {
3253 let input_ident = ident_stack.pop().unwrap();
3254
3255 let futures_ident =
3256 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3257
3258 match builders_or_callback {
3259 BuildersOrCallback::Builders(graph_builders) => {
3260 let builder = graph_builders.get_dfir_mut(&out_location);
3261 builder.add_dfir(
3262 parse_quote! {
3263 #futures_ident = #input_ident -> resolve_futures_blocking();
3264 },
3265 None,
3266 Some(&next_stmt_id.to_string()),
3267 );
3268 }
3269 BuildersOrCallback::Callback(_, node_callback) => {
3270 node_callback(node, next_stmt_id);
3271 }
3272 }
3273
3274 *next_stmt_id += 1;
3275
3276 ident_stack.push(futures_ident);
3277 }
3278
3279 HydroNode::ResolveFuturesOrdered { .. } => {
3280 let input_ident = ident_stack.pop().unwrap();
3281
3282 let futures_ident =
3283 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3284
3285 match builders_or_callback {
3286 BuildersOrCallback::Builders(graph_builders) => {
3287 let builder = graph_builders.get_dfir_mut(&out_location);
3288 builder.add_dfir(
3289 parse_quote! {
3290 #futures_ident = #input_ident -> resolve_futures_ordered();
3291 },
3292 None,
3293 Some(&next_stmt_id.to_string()),
3294 );
3295 }
3296 BuildersOrCallback::Callback(_, node_callback) => {
3297 node_callback(node, next_stmt_id);
3298 }
3299 }
3300
3301 *next_stmt_id += 1;
3302
3303 ident_stack.push(futures_ident);
3304 }
3305
3306 HydroNode::Map { f, .. } => {
3307 let input_ident = ident_stack.pop().unwrap();
3308
3309 let map_ident =
3310 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3311
3312 match builders_or_callback {
3313 BuildersOrCallback::Builders(graph_builders) => {
3314 let builder = graph_builders.get_dfir_mut(&out_location);
3315 builder.add_dfir(
3316 parse_quote! {
3317 #map_ident = #input_ident -> map(#f);
3318 },
3319 None,
3320 Some(&next_stmt_id.to_string()),
3321 );
3322 }
3323 BuildersOrCallback::Callback(_, node_callback) => {
3324 node_callback(node, next_stmt_id);
3325 }
3326 }
3327
3328 *next_stmt_id += 1;
3329
3330 ident_stack.push(map_ident);
3331 }
3332
3333 HydroNode::FlatMap { f, .. } => {
3334 let input_ident = ident_stack.pop().unwrap();
3335
3336 let flat_map_ident =
3337 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3338
3339 match builders_or_callback {
3340 BuildersOrCallback::Builders(graph_builders) => {
3341 let builder = graph_builders.get_dfir_mut(&out_location);
3342 builder.add_dfir(
3343 parse_quote! {
3344 #flat_map_ident = #input_ident -> flat_map(#f);
3345 },
3346 None,
3347 Some(&next_stmt_id.to_string()),
3348 );
3349 }
3350 BuildersOrCallback::Callback(_, node_callback) => {
3351 node_callback(node, next_stmt_id);
3352 }
3353 }
3354
3355 *next_stmt_id += 1;
3356
3357 ident_stack.push(flat_map_ident);
3358 }
3359
3360 HydroNode::FlatMapStreamBlocking { f, .. } => {
3361 let input_ident = ident_stack.pop().unwrap();
3362
3363 let flat_map_stream_blocking_ident =
3364 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3365
3366 match builders_or_callback {
3367 BuildersOrCallback::Builders(graph_builders) => {
3368 let builder = graph_builders.get_dfir_mut(&out_location);
3369 builder.add_dfir(
3370 parse_quote! {
3371 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3372 },
3373 None,
3374 Some(&next_stmt_id.to_string()),
3375 );
3376 }
3377 BuildersOrCallback::Callback(_, node_callback) => {
3378 node_callback(node, next_stmt_id);
3379 }
3380 }
3381
3382 *next_stmt_id += 1;
3383
3384 ident_stack.push(flat_map_stream_blocking_ident);
3385 }
3386
3387 HydroNode::Filter { f, .. } => {
3388 let input_ident = ident_stack.pop().unwrap();
3389
3390 let filter_ident =
3391 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3392
3393 match builders_or_callback {
3394 BuildersOrCallback::Builders(graph_builders) => {
3395 let builder = graph_builders.get_dfir_mut(&out_location);
3396 builder.add_dfir(
3397 parse_quote! {
3398 #filter_ident = #input_ident -> filter(#f);
3399 },
3400 None,
3401 Some(&next_stmt_id.to_string()),
3402 );
3403 }
3404 BuildersOrCallback::Callback(_, node_callback) => {
3405 node_callback(node, next_stmt_id);
3406 }
3407 }
3408
3409 *next_stmt_id += 1;
3410
3411 ident_stack.push(filter_ident);
3412 }
3413
3414 HydroNode::FilterMap { f, .. } => {
3415 let input_ident = ident_stack.pop().unwrap();
3416
3417 let filter_map_ident =
3418 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3419
3420 match builders_or_callback {
3421 BuildersOrCallback::Builders(graph_builders) => {
3422 let builder = graph_builders.get_dfir_mut(&out_location);
3423 builder.add_dfir(
3424 parse_quote! {
3425 #filter_map_ident = #input_ident -> filter_map(#f);
3426 },
3427 None,
3428 Some(&next_stmt_id.to_string()),
3429 );
3430 }
3431 BuildersOrCallback::Callback(_, node_callback) => {
3432 node_callback(node, next_stmt_id);
3433 }
3434 }
3435
3436 *next_stmt_id += 1;
3437
3438 ident_stack.push(filter_map_ident);
3439 }
3440
3441 HydroNode::Sort { .. } => {
3442 let input_ident = ident_stack.pop().unwrap();
3443
3444 let sort_ident =
3445 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3446
3447 match builders_or_callback {
3448 BuildersOrCallback::Builders(graph_builders) => {
3449 let builder = graph_builders.get_dfir_mut(&out_location);
3450 builder.add_dfir(
3451 parse_quote! {
3452 #sort_ident = #input_ident -> sort();
3453 },
3454 None,
3455 Some(&next_stmt_id.to_string()),
3456 );
3457 }
3458 BuildersOrCallback::Callback(_, node_callback) => {
3459 node_callback(node, next_stmt_id);
3460 }
3461 }
3462
3463 *next_stmt_id += 1;
3464
3465 ident_stack.push(sort_ident);
3466 }
3467
3468 HydroNode::DeferTick { .. } => {
3469 let input_ident = ident_stack.pop().unwrap();
3470
3471 let defer_tick_ident =
3472 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3473
3474 match builders_or_callback {
3475 BuildersOrCallback::Builders(graph_builders) => {
3476 let builder = graph_builders.get_dfir_mut(&out_location);
3477 builder.add_dfir(
3478 parse_quote! {
3479 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3480 },
3481 None,
3482 Some(&next_stmt_id.to_string()),
3483 );
3484 }
3485 BuildersOrCallback::Callback(_, node_callback) => {
3486 node_callback(node, next_stmt_id);
3487 }
3488 }
3489
3490 *next_stmt_id += 1;
3491
3492 ident_stack.push(defer_tick_ident);
3493 }
3494
3495 HydroNode::Enumerate { input, .. } => {
3496 let input_ident = ident_stack.pop().unwrap();
3497
3498 let enumerate_ident =
3499 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3500
3501 match builders_or_callback {
3502 BuildersOrCallback::Builders(graph_builders) => {
3503 let builder = graph_builders.get_dfir_mut(&out_location);
3504 let lifetime = if input.metadata().location_id.is_top_level() {
3505 quote!('static)
3506 } else {
3507 quote!('tick)
3508 };
3509 builder.add_dfir(
3510 parse_quote! {
3511 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3512 },
3513 None,
3514 Some(&next_stmt_id.to_string()),
3515 );
3516 }
3517 BuildersOrCallback::Callback(_, node_callback) => {
3518 node_callback(node, next_stmt_id);
3519 }
3520 }
3521
3522 *next_stmt_id += 1;
3523
3524 ident_stack.push(enumerate_ident);
3525 }
3526
3527 HydroNode::Inspect { f, .. } => {
3528 let input_ident = ident_stack.pop().unwrap();
3529
3530 let inspect_ident =
3531 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3532
3533 match builders_or_callback {
3534 BuildersOrCallback::Builders(graph_builders) => {
3535 let builder = graph_builders.get_dfir_mut(&out_location);
3536 builder.add_dfir(
3537 parse_quote! {
3538 #inspect_ident = #input_ident -> inspect(#f);
3539 },
3540 None,
3541 Some(&next_stmt_id.to_string()),
3542 );
3543 }
3544 BuildersOrCallback::Callback(_, node_callback) => {
3545 node_callback(node, next_stmt_id);
3546 }
3547 }
3548
3549 *next_stmt_id += 1;
3550
3551 ident_stack.push(inspect_ident);
3552 }
3553
3554 HydroNode::Unique { input, .. } => {
3555 let input_ident = ident_stack.pop().unwrap();
3556
3557 let unique_ident =
3558 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3559
3560 match builders_or_callback {
3561 BuildersOrCallback::Builders(graph_builders) => {
3562 let builder = graph_builders.get_dfir_mut(&out_location);
3563 let lifetime = if input.metadata().location_id.is_top_level() {
3564 quote!('static)
3565 } else {
3566 quote!('tick)
3567 };
3568
3569 builder.add_dfir(
3570 parse_quote! {
3571 #unique_ident = #input_ident -> unique::<#lifetime>();
3572 },
3573 None,
3574 Some(&next_stmt_id.to_string()),
3575 );
3576 }
3577 BuildersOrCallback::Callback(_, node_callback) => {
3578 node_callback(node, next_stmt_id);
3579 }
3580 }
3581
3582 *next_stmt_id += 1;
3583
3584 ident_stack.push(unique_ident);
3585 }
3586
3587 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
3588 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3589 if input.metadata().location_id.is_top_level()
3590 && input.metadata().collection_kind.is_bounded()
3591 {
3592 parse_quote!(fold_no_replay)
3593 } else {
3594 parse_quote!(fold)
3595 }
3596 } else if matches!(node, HydroNode::Scan { .. }) {
3597 parse_quote!(scan)
3598 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
3599 parse_quote!(scan_async_blocking)
3600 } else if let HydroNode::FoldKeyed { input, .. } = node {
3601 if input.metadata().location_id.is_top_level()
3602 && input.metadata().collection_kind.is_bounded()
3603 {
3604 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3605 } else {
3606 parse_quote!(fold_keyed)
3607 }
3608 } else {
3609 unreachable!()
3610 };
3611
3612 let (HydroNode::Fold { input, .. }
3613 | HydroNode::FoldKeyed { input, .. }
3614 | HydroNode::Scan { input, .. }
3615 | HydroNode::ScanAsyncBlocking { input, .. }) = node
3616 else {
3617 unreachable!()
3618 };
3619
3620 let lifetime = if input.metadata().location_id.is_top_level() {
3621 quote!('static)
3622 } else {
3623 quote!('tick)
3624 };
3625
3626 let input_ident = ident_stack.pop().unwrap();
3627
3628 let (HydroNode::Fold { init, acc, .. }
3629 | HydroNode::FoldKeyed { init, acc, .. }
3630 | HydroNode::Scan { init, acc, .. }
3631 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
3632 else {
3633 unreachable!()
3634 };
3635
3636 let fold_ident =
3637 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3638
3639 match builders_or_callback {
3640 BuildersOrCallback::Builders(graph_builders) => {
3641 if matches!(node, HydroNode::Fold { .. })
3642 && node.metadata().location_id.is_top_level()
3643 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3644 && graph_builders.singleton_intermediates()
3645 && !node.metadata().collection_kind.is_bounded()
3646 {
3647 let builder = graph_builders.get_dfir_mut(&out_location);
3648
3649 let acc: syn::Expr = parse_quote!({
3650 let mut __inner = #acc;
3651 move |__state, __value| {
3652 __inner(__state, __value);
3653 Some(__state.clone())
3654 }
3655 });
3656
3657 builder.add_dfir(
3658 parse_quote! {
3659 source_iter([(#init)()]) -> [0]#fold_ident;
3660 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3661 #fold_ident = chain();
3662 },
3663 None,
3664 Some(&next_stmt_id.to_string()),
3665 );
3666 } else if matches!(node, HydroNode::FoldKeyed { .. })
3667 && node.metadata().location_id.is_top_level()
3668 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3669 && graph_builders.singleton_intermediates()
3670 && !node.metadata().collection_kind.is_bounded()
3671 {
3672 let builder = graph_builders.get_dfir_mut(&out_location);
3673
3674 let acc: syn::Expr = parse_quote!({
3675 let mut __init = #init;
3676 let mut __inner = #acc;
3677 move |__state, __kv: (_, _)| {
3678 let __state = __state
3680 .entry(::std::clone::Clone::clone(&__kv.0))
3681 .or_insert_with(|| (__init)());
3682 __inner(__state, __kv.1);
3683 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3684 }
3685 });
3686
3687 builder.add_dfir(
3688 parse_quote! {
3689 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3690 },
3691 None,
3692 Some(&next_stmt_id.to_string()),
3693 );
3694 } else {
3695 let builder = graph_builders.get_dfir_mut(&out_location);
3696 builder.add_dfir(
3697 parse_quote! {
3698 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3699 },
3700 None,
3701 Some(&next_stmt_id.to_string()),
3702 );
3703 }
3704 }
3705 BuildersOrCallback::Callback(_, node_callback) => {
3706 node_callback(node, next_stmt_id);
3707 }
3708 }
3709
3710 *next_stmt_id += 1;
3711
3712 ident_stack.push(fold_ident);
3713 }
3714
3715 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3716 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3717 if input.metadata().location_id.is_top_level()
3718 && input.metadata().collection_kind.is_bounded()
3719 {
3720 parse_quote!(reduce_no_replay)
3721 } else {
3722 parse_quote!(reduce)
3723 }
3724 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3725 if input.metadata().location_id.is_top_level()
3726 && input.metadata().collection_kind.is_bounded()
3727 {
3728 todo!(
3729 "Calling keyed reduce on a top-level bounded collection is not supported"
3730 )
3731 } else {
3732 parse_quote!(reduce_keyed)
3733 }
3734 } else {
3735 unreachable!()
3736 };
3737
3738 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3739 else {
3740 unreachable!()
3741 };
3742
3743 let lifetime = if input.metadata().location_id.is_top_level() {
3744 quote!('static)
3745 } else {
3746 quote!('tick)
3747 };
3748
3749 let input_ident = ident_stack.pop().unwrap();
3750
3751 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3752 else {
3753 unreachable!()
3754 };
3755
3756 let reduce_ident =
3757 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3758
3759 match builders_or_callback {
3760 BuildersOrCallback::Builders(graph_builders) => {
3761 if matches!(node, HydroNode::Reduce { .. })
3762 && node.metadata().location_id.is_top_level()
3763 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3764 && graph_builders.singleton_intermediates()
3765 && !node.metadata().collection_kind.is_bounded()
3766 {
3767 todo!(
3768 "Reduce with optional intermediates is not yet supported in simulator"
3769 );
3770 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3771 && node.metadata().location_id.is_top_level()
3772 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3773 && graph_builders.singleton_intermediates()
3774 && !node.metadata().collection_kind.is_bounded()
3775 {
3776 todo!(
3777 "Reduce keyed with optional intermediates is not yet supported in simulator"
3778 );
3779 } else {
3780 let builder = graph_builders.get_dfir_mut(&out_location);
3781 builder.add_dfir(
3782 parse_quote! {
3783 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3784 },
3785 None,
3786 Some(&next_stmt_id.to_string()),
3787 );
3788 }
3789 }
3790 BuildersOrCallback::Callback(_, node_callback) => {
3791 node_callback(node, next_stmt_id);
3792 }
3793 }
3794
3795 *next_stmt_id += 1;
3796
3797 ident_stack.push(reduce_ident);
3798 }
3799
3800 HydroNode::ReduceKeyedWatermark {
3801 f,
3802 input,
3803 metadata,
3804 ..
3805 } => {
3806 let lifetime = if input.metadata().location_id.is_top_level() {
3807 quote!('static)
3808 } else {
3809 quote!('tick)
3810 };
3811
3812 let watermark_ident = ident_stack.pop().unwrap();
3814 let input_ident = ident_stack.pop().unwrap();
3815
3816 let chain_ident = syn::Ident::new(
3817 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3818 Span::call_site(),
3819 );
3820
3821 let fold_ident =
3822 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3823
3824 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3825 && input.metadata().collection_kind.is_bounded()
3826 {
3827 parse_quote!(fold_no_replay)
3828 } else {
3829 parse_quote!(fold)
3830 };
3831
3832 match builders_or_callback {
3833 BuildersOrCallback::Builders(graph_builders) => {
3834 if metadata.location_id.is_top_level()
3835 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3836 && graph_builders.singleton_intermediates()
3837 && !metadata.collection_kind.is_bounded()
3838 {
3839 todo!(
3840 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3841 )
3842 } else {
3843 let builder = graph_builders.get_dfir_mut(&out_location);
3844 builder.add_dfir(
3845 parse_quote! {
3846 #chain_ident = chain();
3847 #input_ident
3848 -> map(|x| (Some(x), None))
3849 -> [0]#chain_ident;
3850 #watermark_ident
3851 -> map(|watermark| (None, Some(watermark)))
3852 -> [1]#chain_ident;
3853
3854 #fold_ident = #chain_ident
3855 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3856 let __reduce_keyed_fn = #f;
3857 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3858 if let Some((k, v)) = opt_payload {
3859 if let Some(curr_watermark) = *opt_curr_watermark {
3860 if k < curr_watermark {
3861 return;
3862 }
3863 }
3864 match map.entry(k) {
3865 ::std::collections::hash_map::Entry::Vacant(e) => {
3866 e.insert(v);
3867 }
3868 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3869 __reduce_keyed_fn(e.get_mut(), v);
3870 }
3871 }
3872 } else {
3873 let watermark = opt_watermark.unwrap();
3874 if let Some(curr_watermark) = *opt_curr_watermark {
3875 if watermark <= curr_watermark {
3876 return;
3877 }
3878 }
3879 *opt_curr_watermark = opt_watermark;
3880 map.retain(|k, _| *k >= watermark);
3881 }
3882 }
3883 })
3884 -> flat_map(|(map, _curr_watermark)| map);
3885 },
3886 None,
3887 Some(&next_stmt_id.to_string()),
3888 );
3889 }
3890 }
3891 BuildersOrCallback::Callback(_, node_callback) => {
3892 node_callback(node, next_stmt_id);
3893 }
3894 }
3895
3896 *next_stmt_id += 1;
3897
3898 ident_stack.push(fold_ident);
3899 }
3900
3901 HydroNode::Network {
3902 networking_info,
3903 serialize_fn: serialize_pipeline,
3904 instantiate_fn,
3905 deserialize_fn: deserialize_pipeline,
3906 input,
3907 ..
3908 } => {
3909 let input_ident = ident_stack.pop().unwrap();
3910
3911 let receiver_stream_ident =
3912 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3913
3914 match builders_or_callback {
3915 BuildersOrCallback::Builders(graph_builders) => {
3916 let (sink_expr, source_expr) = match instantiate_fn {
3917 DebugInstantiate::Building => (
3918 syn::parse_quote!(DUMMY_SINK),
3919 syn::parse_quote!(DUMMY_SOURCE),
3920 ),
3921
3922 DebugInstantiate::Finalized(finalized) => {
3923 (finalized.sink.clone(), finalized.source.clone())
3924 }
3925 };
3926
3927 graph_builders.create_network(
3928 &input.metadata().location_id,
3929 &out_location,
3930 input_ident,
3931 &receiver_stream_ident,
3932 serialize_pipeline.as_ref(),
3933 sink_expr,
3934 source_expr,
3935 deserialize_pipeline.as_ref(),
3936 *next_stmt_id,
3937 networking_info,
3938 );
3939 }
3940 BuildersOrCallback::Callback(_, node_callback) => {
3941 node_callback(node, next_stmt_id);
3942 }
3943 }
3944
3945 *next_stmt_id += 1;
3946
3947 ident_stack.push(receiver_stream_ident);
3948 }
3949
3950 HydroNode::ExternalInput {
3951 instantiate_fn,
3952 deserialize_fn: deserialize_pipeline,
3953 ..
3954 } => {
3955 let receiver_stream_ident =
3956 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3957
3958 match builders_or_callback {
3959 BuildersOrCallback::Builders(graph_builders) => {
3960 let (_, source_expr) = match instantiate_fn {
3961 DebugInstantiate::Building => (
3962 syn::parse_quote!(DUMMY_SINK),
3963 syn::parse_quote!(DUMMY_SOURCE),
3964 ),
3965
3966 DebugInstantiate::Finalized(finalized) => {
3967 (finalized.sink.clone(), finalized.source.clone())
3968 }
3969 };
3970
3971 graph_builders.create_external_source(
3972 &out_location,
3973 source_expr,
3974 &receiver_stream_ident,
3975 deserialize_pipeline.as_ref(),
3976 *next_stmt_id,
3977 );
3978 }
3979 BuildersOrCallback::Callback(_, node_callback) => {
3980 node_callback(node, next_stmt_id);
3981 }
3982 }
3983
3984 *next_stmt_id += 1;
3985
3986 ident_stack.push(receiver_stream_ident);
3987 }
3988
3989 HydroNode::Counter {
3990 tag,
3991 duration,
3992 prefix,
3993 ..
3994 } => {
3995 let input_ident = ident_stack.pop().unwrap();
3996
3997 let counter_ident =
3998 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3999
4000 match builders_or_callback {
4001 BuildersOrCallback::Builders(graph_builders) => {
4002 let arg = format!("{}({})", prefix, tag);
4003 let builder = graph_builders.get_dfir_mut(&out_location);
4004 builder.add_dfir(
4005 parse_quote! {
4006 #counter_ident = #input_ident -> _counter(#arg, #duration);
4007 },
4008 None,
4009 Some(&next_stmt_id.to_string()),
4010 );
4011 }
4012 BuildersOrCallback::Callback(_, node_callback) => {
4013 node_callback(node, next_stmt_id);
4014 }
4015 }
4016
4017 *next_stmt_id += 1;
4018
4019 ident_stack.push(counter_ident);
4020 }
4021 }
4022 },
4023 seen_tees,
4024 false,
4025 );
4026
4027 ident_stack
4028 .pop()
4029 .expect("ident_stack should have exactly one element after traversal")
4030 }
4031
4032 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4033 match self {
4034 HydroNode::Placeholder => {
4035 panic!()
4036 }
4037 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4038 HydroNode::Source { source, .. } => match source {
4039 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4040 HydroSource::ExternalNetwork()
4041 | HydroSource::Spin()
4042 | HydroSource::ClusterMembers(_, _)
4043 | HydroSource::Embedded(_)
4044 | HydroSource::EmbeddedSingleton(_) => {} },
4046 HydroNode::SingletonSource { value, .. } => {
4047 transform(value);
4048 }
4049 HydroNode::CycleSource { .. }
4050 | HydroNode::Tee { .. }
4051 | HydroNode::YieldConcat { .. }
4052 | HydroNode::BeginAtomic { .. }
4053 | HydroNode::EndAtomic { .. }
4054 | HydroNode::Batch { .. }
4055 | HydroNode::Chain { .. }
4056 | HydroNode::ChainFirst { .. }
4057 | HydroNode::CrossProduct { .. }
4058 | HydroNode::CrossSingleton { .. }
4059 | HydroNode::ResolveFutures { .. }
4060 | HydroNode::ResolveFuturesBlocking { .. }
4061 | HydroNode::ResolveFuturesOrdered { .. }
4062 | HydroNode::Join { .. }
4063 | HydroNode::Difference { .. }
4064 | HydroNode::AntiJoin { .. }
4065 | HydroNode::DeferTick { .. }
4066 | HydroNode::Enumerate { .. }
4067 | HydroNode::Unique { .. }
4068 | HydroNode::Sort { .. } => {}
4069 HydroNode::Map { f, .. }
4070 | HydroNode::FlatMap { f, .. }
4071 | HydroNode::FlatMapStreamBlocking { f, .. }
4072 | HydroNode::Filter { f, .. }
4073 | HydroNode::FilterMap { f, .. }
4074 | HydroNode::Inspect { f, .. }
4075 | HydroNode::Partition { f, .. }
4076 | HydroNode::Reduce { f, .. }
4077 | HydroNode::ReduceKeyed { f, .. }
4078 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4079 transform(f);
4080 }
4081 HydroNode::Fold { init, acc, .. }
4082 | HydroNode::Scan { init, acc, .. }
4083 | HydroNode::ScanAsyncBlocking { init, acc, .. }
4084 | HydroNode::FoldKeyed { init, acc, .. } => {
4085 transform(init);
4086 transform(acc);
4087 }
4088 HydroNode::Network {
4089 serialize_fn,
4090 deserialize_fn,
4091 ..
4092 } => {
4093 if let Some(serialize_fn) = serialize_fn {
4094 transform(serialize_fn);
4095 }
4096 if let Some(deserialize_fn) = deserialize_fn {
4097 transform(deserialize_fn);
4098 }
4099 }
4100 HydroNode::ExternalInput { deserialize_fn, .. } => {
4101 if let Some(deserialize_fn) = deserialize_fn {
4102 transform(deserialize_fn);
4103 }
4104 }
4105 HydroNode::Counter { duration, .. } => {
4106 transform(duration);
4107 }
4108 }
4109 }
4110
4111 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4112 &self.metadata().op
4113 }
4114
4115 pub fn metadata(&self) -> &HydroIrMetadata {
4116 match self {
4117 HydroNode::Placeholder => {
4118 panic!()
4119 }
4120 HydroNode::Cast { metadata, .. } => metadata,
4121 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4122 HydroNode::Source { metadata, .. } => metadata,
4123 HydroNode::SingletonSource { metadata, .. } => metadata,
4124 HydroNode::CycleSource { metadata, .. } => metadata,
4125 HydroNode::Tee { metadata, .. } => metadata,
4126 HydroNode::Partition { metadata, .. } => metadata,
4127 HydroNode::YieldConcat { metadata, .. } => metadata,
4128 HydroNode::BeginAtomic { metadata, .. } => metadata,
4129 HydroNode::EndAtomic { metadata, .. } => metadata,
4130 HydroNode::Batch { metadata, .. } => metadata,
4131 HydroNode::Chain { metadata, .. } => metadata,
4132 HydroNode::ChainFirst { metadata, .. } => metadata,
4133 HydroNode::CrossProduct { metadata, .. } => metadata,
4134 HydroNode::CrossSingleton { metadata, .. } => metadata,
4135 HydroNode::Join { metadata, .. } => metadata,
4136 HydroNode::Difference { metadata, .. } => metadata,
4137 HydroNode::AntiJoin { metadata, .. } => metadata,
4138 HydroNode::ResolveFutures { metadata, .. } => metadata,
4139 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4140 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4141 HydroNode::Map { metadata, .. } => metadata,
4142 HydroNode::FlatMap { metadata, .. } => metadata,
4143 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4144 HydroNode::Filter { metadata, .. } => metadata,
4145 HydroNode::FilterMap { metadata, .. } => metadata,
4146 HydroNode::DeferTick { metadata, .. } => metadata,
4147 HydroNode::Enumerate { metadata, .. } => metadata,
4148 HydroNode::Inspect { metadata, .. } => metadata,
4149 HydroNode::Unique { metadata, .. } => metadata,
4150 HydroNode::Sort { metadata, .. } => metadata,
4151 HydroNode::Scan { metadata, .. } => metadata,
4152 HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4153 HydroNode::Fold { metadata, .. } => metadata,
4154 HydroNode::FoldKeyed { metadata, .. } => metadata,
4155 HydroNode::Reduce { metadata, .. } => metadata,
4156 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4157 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4158 HydroNode::ExternalInput { metadata, .. } => metadata,
4159 HydroNode::Network { metadata, .. } => metadata,
4160 HydroNode::Counter { metadata, .. } => metadata,
4161 }
4162 }
4163
4164 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4165 &mut self.metadata_mut().op
4166 }
4167
4168 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4169 match self {
4170 HydroNode::Placeholder => {
4171 panic!()
4172 }
4173 HydroNode::Cast { metadata, .. } => metadata,
4174 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4175 HydroNode::Source { metadata, .. } => metadata,
4176 HydroNode::SingletonSource { metadata, .. } => metadata,
4177 HydroNode::CycleSource { metadata, .. } => metadata,
4178 HydroNode::Tee { metadata, .. } => metadata,
4179 HydroNode::Partition { metadata, .. } => metadata,
4180 HydroNode::YieldConcat { metadata, .. } => metadata,
4181 HydroNode::BeginAtomic { metadata, .. } => metadata,
4182 HydroNode::EndAtomic { metadata, .. } => metadata,
4183 HydroNode::Batch { metadata, .. } => metadata,
4184 HydroNode::Chain { metadata, .. } => metadata,
4185 HydroNode::ChainFirst { metadata, .. } => metadata,
4186 HydroNode::CrossProduct { metadata, .. } => metadata,
4187 HydroNode::CrossSingleton { metadata, .. } => metadata,
4188 HydroNode::Join { metadata, .. } => metadata,
4189 HydroNode::Difference { metadata, .. } => metadata,
4190 HydroNode::AntiJoin { metadata, .. } => metadata,
4191 HydroNode::ResolveFutures { metadata, .. } => metadata,
4192 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4193 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4194 HydroNode::Map { metadata, .. } => metadata,
4195 HydroNode::FlatMap { metadata, .. } => metadata,
4196 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4197 HydroNode::Filter { metadata, .. } => metadata,
4198 HydroNode::FilterMap { metadata, .. } => metadata,
4199 HydroNode::DeferTick { metadata, .. } => metadata,
4200 HydroNode::Enumerate { metadata, .. } => metadata,
4201 HydroNode::Inspect { metadata, .. } => metadata,
4202 HydroNode::Unique { metadata, .. } => metadata,
4203 HydroNode::Sort { metadata, .. } => metadata,
4204 HydroNode::Scan { metadata, .. } => metadata,
4205 HydroNode::ScanAsyncBlocking { metadata, .. } => metadata,
4206 HydroNode::Fold { metadata, .. } => metadata,
4207 HydroNode::FoldKeyed { metadata, .. } => metadata,
4208 HydroNode::Reduce { metadata, .. } => metadata,
4209 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4210 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4211 HydroNode::ExternalInput { metadata, .. } => metadata,
4212 HydroNode::Network { metadata, .. } => metadata,
4213 HydroNode::Counter { metadata, .. } => metadata,
4214 }
4215 }
4216
4217 pub fn input(&self) -> Vec<&HydroNode> {
4218 match self {
4219 HydroNode::Placeholder => {
4220 panic!()
4221 }
4222 HydroNode::Source { .. }
4223 | HydroNode::SingletonSource { .. }
4224 | HydroNode::ExternalInput { .. }
4225 | HydroNode::CycleSource { .. }
4226 | HydroNode::Tee { .. }
4227 | HydroNode::Partition { .. } => {
4228 vec![]
4230 }
4231 HydroNode::Cast { inner, .. }
4232 | HydroNode::ObserveNonDet { inner, .. }
4233 | HydroNode::YieldConcat { inner, .. }
4234 | HydroNode::BeginAtomic { inner, .. }
4235 | HydroNode::EndAtomic { inner, .. }
4236 | HydroNode::Batch { inner, .. } => {
4237 vec![inner]
4238 }
4239 HydroNode::Chain { first, second, .. } => {
4240 vec![first, second]
4241 }
4242 HydroNode::ChainFirst { first, second, .. } => {
4243 vec![first, second]
4244 }
4245 HydroNode::CrossProduct { left, right, .. }
4246 | HydroNode::CrossSingleton { left, right, .. }
4247 | HydroNode::Join { left, right, .. } => {
4248 vec![left, right]
4249 }
4250 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4251 vec![pos, neg]
4252 }
4253 HydroNode::Map { input, .. }
4254 | HydroNode::FlatMap { input, .. }
4255 | HydroNode::FlatMapStreamBlocking { input, .. }
4256 | HydroNode::Filter { input, .. }
4257 | HydroNode::FilterMap { input, .. }
4258 | HydroNode::Sort { input, .. }
4259 | HydroNode::DeferTick { input, .. }
4260 | HydroNode::Enumerate { input, .. }
4261 | HydroNode::Inspect { input, .. }
4262 | HydroNode::Unique { input, .. }
4263 | HydroNode::Network { input, .. }
4264 | HydroNode::Counter { input, .. }
4265 | HydroNode::ResolveFutures { input, .. }
4266 | HydroNode::ResolveFuturesBlocking { input, .. }
4267 | HydroNode::ResolveFuturesOrdered { input, .. }
4268 | HydroNode::Fold { input, .. }
4269 | HydroNode::FoldKeyed { input, .. }
4270 | HydroNode::Reduce { input, .. }
4271 | HydroNode::ReduceKeyed { input, .. }
4272 | HydroNode::Scan { input, .. }
4273 | HydroNode::ScanAsyncBlocking { input, .. } => {
4274 vec![input]
4275 }
4276 HydroNode::ReduceKeyedWatermark {
4277 input, watermark, ..
4278 } => {
4279 vec![input, watermark]
4280 }
4281 }
4282 }
4283
4284 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4285 self.input()
4286 .iter()
4287 .map(|input_node| input_node.metadata())
4288 .collect()
4289 }
4290
4291 pub fn is_shared_with_others(&self) -> bool {
4295 match self {
4296 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4297 Rc::strong_count(&inner.0) > 1
4298 }
4299 _ => false,
4300 }
4301 }
4302
4303 pub fn print_root(&self) -> String {
4304 match self {
4305 HydroNode::Placeholder => {
4306 panic!()
4307 }
4308 HydroNode::Cast { .. } => "Cast()".to_owned(),
4309 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4310 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4311 HydroNode::SingletonSource {
4312 value,
4313 first_tick_only,
4314 ..
4315 } => format!(
4316 "SingletonSource({:?}, first_tick_only={})",
4317 value, first_tick_only
4318 ),
4319 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4320 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4321 HydroNode::Partition { f, is_true, .. } => {
4322 format!("Partition({:?}, is_true={})", f, is_true)
4323 }
4324 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4325 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4326 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4327 HydroNode::Batch { .. } => "Batch()".to_owned(),
4328 HydroNode::Chain { first, second, .. } => {
4329 format!("Chain({}, {})", first.print_root(), second.print_root())
4330 }
4331 HydroNode::ChainFirst { first, second, .. } => {
4332 format!(
4333 "ChainFirst({}, {})",
4334 first.print_root(),
4335 second.print_root()
4336 )
4337 }
4338 HydroNode::CrossProduct { left, right, .. } => {
4339 format!(
4340 "CrossProduct({}, {})",
4341 left.print_root(),
4342 right.print_root()
4343 )
4344 }
4345 HydroNode::CrossSingleton { left, right, .. } => {
4346 format!(
4347 "CrossSingleton({}, {})",
4348 left.print_root(),
4349 right.print_root()
4350 )
4351 }
4352 HydroNode::Join { left, right, .. } => {
4353 format!("Join({}, {})", left.print_root(), right.print_root())
4354 }
4355 HydroNode::Difference { pos, neg, .. } => {
4356 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4357 }
4358 HydroNode::AntiJoin { pos, neg, .. } => {
4359 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4360 }
4361 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4362 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4363 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4364 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4365 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4366 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4367 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4368 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4369 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4370 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4371 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4372 HydroNode::Unique { .. } => "Unique()".to_owned(),
4373 HydroNode::Sort { .. } => "Sort()".to_owned(),
4374 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4375 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4376 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
4377 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
4378 }
4379 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4380 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4381 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4382 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4383 HydroNode::Network { .. } => "Network()".to_owned(),
4384 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4385 HydroNode::Counter { tag, duration, .. } => {
4386 format!("Counter({:?}, {:?})", tag, duration)
4387 }
4388 }
4389 }
4390}
4391
4392#[cfg(feature = "build")]
4393fn instantiate_network<'a, D>(
4394 env: &mut D::InstantiateEnv,
4395 from_location: &LocationId,
4396 to_location: &LocationId,
4397 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4398 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4399 name: Option<&str>,
4400 networking_info: &crate::networking::NetworkingInfo,
4401) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4402where
4403 D: Deploy<'a>,
4404{
4405 let ((sink, source), connect_fn) = match (from_location, to_location) {
4406 (&LocationId::Process(from), &LocationId::Process(to)) => {
4407 let from_node = processes
4408 .get(from)
4409 .unwrap_or_else(|| {
4410 panic!("A process used in the graph was not instantiated: {}", from)
4411 })
4412 .clone();
4413 let to_node = processes
4414 .get(to)
4415 .unwrap_or_else(|| {
4416 panic!("A process used in the graph was not instantiated: {}", to)
4417 })
4418 .clone();
4419
4420 let sink_port = from_node.next_port();
4421 let source_port = to_node.next_port();
4422
4423 (
4424 D::o2o_sink_source(
4425 env,
4426 &from_node,
4427 &sink_port,
4428 &to_node,
4429 &source_port,
4430 name,
4431 networking_info,
4432 ),
4433 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4434 )
4435 }
4436 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4437 let from_node = processes
4438 .get(from)
4439 .unwrap_or_else(|| {
4440 panic!("A process used in the graph was not instantiated: {}", from)
4441 })
4442 .clone();
4443 let to_node = clusters
4444 .get(to)
4445 .unwrap_or_else(|| {
4446 panic!("A cluster used in the graph was not instantiated: {}", to)
4447 })
4448 .clone();
4449
4450 let sink_port = from_node.next_port();
4451 let source_port = to_node.next_port();
4452
4453 (
4454 D::o2m_sink_source(
4455 env,
4456 &from_node,
4457 &sink_port,
4458 &to_node,
4459 &source_port,
4460 name,
4461 networking_info,
4462 ),
4463 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4464 )
4465 }
4466 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4467 let from_node = clusters
4468 .get(from)
4469 .unwrap_or_else(|| {
4470 panic!("A cluster used in the graph was not instantiated: {}", from)
4471 })
4472 .clone();
4473 let to_node = processes
4474 .get(to)
4475 .unwrap_or_else(|| {
4476 panic!("A process used in the graph was not instantiated: {}", to)
4477 })
4478 .clone();
4479
4480 let sink_port = from_node.next_port();
4481 let source_port = to_node.next_port();
4482
4483 (
4484 D::m2o_sink_source(
4485 env,
4486 &from_node,
4487 &sink_port,
4488 &to_node,
4489 &source_port,
4490 name,
4491 networking_info,
4492 ),
4493 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4494 )
4495 }
4496 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4497 let from_node = clusters
4498 .get(from)
4499 .unwrap_or_else(|| {
4500 panic!("A cluster used in the graph was not instantiated: {}", from)
4501 })
4502 .clone();
4503 let to_node = clusters
4504 .get(to)
4505 .unwrap_or_else(|| {
4506 panic!("A cluster used in the graph was not instantiated: {}", to)
4507 })
4508 .clone();
4509
4510 let sink_port = from_node.next_port();
4511 let source_port = to_node.next_port();
4512
4513 (
4514 D::m2m_sink_source(
4515 env,
4516 &from_node,
4517 &sink_port,
4518 &to_node,
4519 &source_port,
4520 name,
4521 networking_info,
4522 ),
4523 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4524 )
4525 }
4526 (LocationId::Tick(_, _), _) => panic!(),
4527 (_, LocationId::Tick(_, _)) => panic!(),
4528 (LocationId::Atomic(_), _) => panic!(),
4529 (_, LocationId::Atomic(_)) => panic!(),
4530 };
4531 (sink, source, connect_fn)
4532}
4533
4534#[cfg(test)]
4535mod test {
4536 use std::mem::size_of;
4537
4538 use stageleft::{QuotedWithContext, q};
4539
4540 use super::*;
4541
4542 #[test]
4543 #[cfg_attr(
4544 not(feature = "build"),
4545 ignore = "expects inclusion of feature-gated fields"
4546 )]
4547 fn hydro_node_size() {
4548 assert_eq!(size_of::<HydroNode>(), 248);
4549 }
4550
4551 #[test]
4552 #[cfg_attr(
4553 not(feature = "build"),
4554 ignore = "expects inclusion of feature-gated fields"
4555 )]
4556 fn hydro_root_size() {
4557 assert_eq!(size_of::<HydroRoot>(), 136);
4558 }
4559
4560 #[test]
4561 fn test_simplify_q_macro_basic() {
4562 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4564 let result = simplify_q_macro(simple_expr.clone());
4565 assert_eq!(result, simple_expr);
4566 }
4567
4568 #[test]
4569 fn test_simplify_q_macro_actual_stageleft_call() {
4570 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4572 let result = simplify_q_macro(stageleft_call);
4573 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4576 }
4577
4578 #[test]
4579 fn test_closure_no_pipe_at_start() {
4580 let stageleft_call = q!({
4582 let foo = 123;
4583 move |b: usize| b + foo
4584 })
4585 .splice_fn1_ctx(&());
4586 let result = simplify_q_macro(stageleft_call);
4587 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4588 }
4589}