1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26#[cfg(feature = "build")]
27use crate::compile::builder::ClockId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31use crate::location::dynamic::LocationId;
32use crate::location::{LocationKey, NetworkHint};
33
34pub mod backtrace;
35use backtrace::Backtrace;
36
37#[derive(Clone, Hash)]
41pub struct DebugExpr(pub Box<syn::Expr>);
42
43impl From<syn::Expr> for DebugExpr {
44 fn from(expr: syn::Expr) -> Self {
45 Self(Box::new(expr))
46 }
47}
48
49impl Deref for DebugExpr {
50 type Target = syn::Expr;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57impl ToTokens for DebugExpr {
58 fn to_tokens(&self, tokens: &mut TokenStream) {
59 self.0.to_tokens(tokens);
60 }
61}
62
63impl Debug for DebugExpr {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "{}", self.0.to_token_stream())
66 }
67}
68
69impl Display for DebugExpr {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 let original = self.0.as_ref().clone();
72 let simplified = simplify_q_macro(original);
73
74 write!(f, "q!({})", quote::quote!(#simplified))
77 }
78}
79
80fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
82 let mut simplifier = QMacroSimplifier::new();
85 simplifier.visit_expr_mut(&mut expr);
86
87 if let Some(simplified) = simplifier.simplified_result {
89 simplified
90 } else {
91 expr
92 }
93}
94
95#[derive(Default)]
97pub struct QMacroSimplifier {
98 pub simplified_result: Option<syn::Expr>,
99}
100
101impl QMacroSimplifier {
102 pub fn new() -> Self {
103 Self::default()
104 }
105}
106
107impl VisitMut for QMacroSimplifier {
108 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
109 if self.simplified_result.is_some() {
111 return;
112 }
113
114 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
115 && self.is_stageleft_runtime_support_call(&path_expr.path)
117 && let Some(closure) = self.extract_closure_from_args(&call.args)
119 {
120 self.simplified_result = Some(closure);
121 return;
122 }
123
124 syn::visit_mut::visit_expr_mut(self, expr);
127 }
128}
129
130impl QMacroSimplifier {
131 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
132 if let Some(last_segment) = path.segments.last() {
134 let fn_name = last_segment.ident.to_string();
135 fn_name.contains("_type_hint")
137 && path.segments.len() > 2
138 && path.segments[0].ident == "stageleft"
139 && path.segments[1].ident == "runtime_support"
140 } else {
141 false
142 }
143 }
144
145 fn extract_closure_from_args(
146 &self,
147 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
148 ) -> Option<syn::Expr> {
149 for arg in args {
151 if let syn::Expr::Closure(_) = arg {
152 return Some(arg.clone());
153 }
154 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
156 return Some(closure_expr);
157 }
158 }
159 None
160 }
161
162 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
163 let mut visitor = ClosureFinder {
164 found_closure: None,
165 prefer_inner_blocks: true,
166 };
167 visitor.visit_expr(expr);
168 visitor.found_closure
169 }
170}
171
172struct ClosureFinder {
174 found_closure: Option<syn::Expr>,
175 prefer_inner_blocks: bool,
176}
177
178impl<'ast> Visit<'ast> for ClosureFinder {
179 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
180 if self.found_closure.is_some() {
182 return;
183 }
184
185 match expr {
186 syn::Expr::Closure(_) => {
187 self.found_closure = Some(expr.clone());
188 }
189 syn::Expr::Block(block) if self.prefer_inner_blocks => {
190 for stmt in &block.block.stmts {
192 if let syn::Stmt::Expr(stmt_expr, _) = stmt
193 && let syn::Expr::Block(_) = stmt_expr
194 {
195 let mut inner_visitor = ClosureFinder {
197 found_closure: None,
198 prefer_inner_blocks: false, };
200 inner_visitor.visit_expr(stmt_expr);
201 if inner_visitor.found_closure.is_some() {
202 self.found_closure = Some(stmt_expr.clone());
204 return;
205 }
206 }
207 }
208
209 visit::visit_expr(self, expr);
211
212 if self.found_closure.is_some() {
215 }
217 }
218 _ => {
219 visit::visit_expr(self, expr);
221 }
222 }
223 }
224}
225
226#[derive(Clone, PartialEq, Eq, Hash)]
230pub struct DebugType(pub Box<syn::Type>);
231
232impl From<syn::Type> for DebugType {
233 fn from(t: syn::Type) -> Self {
234 Self(Box::new(t))
235 }
236}
237
238impl Deref for DebugType {
239 type Target = syn::Type;
240
241 fn deref(&self) -> &Self::Target {
242 &self.0
243 }
244}
245
246impl ToTokens for DebugType {
247 fn to_tokens(&self, tokens: &mut TokenStream) {
248 self.0.to_tokens(tokens);
249 }
250}
251
252impl Debug for DebugType {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 write!(f, "{}", self.0.to_token_stream())
255 }
256}
257
258pub enum DebugInstantiate {
259 Building,
260 Finalized(Box<DebugInstantiateFinalized>),
261}
262
263#[cfg_attr(
264 not(feature = "build"),
265 expect(
266 dead_code,
267 reason = "sink, source unused without `feature = \"build\"`."
268 )
269)]
270pub struct DebugInstantiateFinalized {
271 sink: syn::Expr,
272 source: syn::Expr,
273 connect_fn: Option<Box<dyn FnOnce()>>,
274}
275
276impl From<DebugInstantiateFinalized> for DebugInstantiate {
277 fn from(f: DebugInstantiateFinalized) -> Self {
278 Self::Finalized(Box::new(f))
279 }
280}
281
282impl Debug for DebugInstantiate {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 write!(f, "<network instantiate>")
285 }
286}
287
288impl Hash for DebugInstantiate {
289 fn hash<H: Hasher>(&self, _state: &mut H) {
290 }
292}
293
294impl Clone for DebugInstantiate {
295 fn clone(&self) -> Self {
296 match self {
297 DebugInstantiate::Building => DebugInstantiate::Building,
298 DebugInstantiate::Finalized(_) => {
299 panic!("DebugInstantiate::Finalized should not be cloned")
300 }
301 }
302 }
303}
304
305#[derive(Debug, Hash, Clone)]
314pub enum ClusterMembersState {
315 Uninit,
317 Stream(DebugExpr),
320 Tee(LocationId, LocationId),
324}
325
326#[derive(Debug, Hash, Clone)]
328pub enum HydroSource {
329 Stream(DebugExpr),
330 ExternalNetwork(),
331 Iter(DebugExpr),
332 Spin(),
333 ClusterMembers(LocationId, ClusterMembersState),
334 Embedded(syn::Ident),
335 EmbeddedSingleton(syn::Ident),
336}
337
338#[cfg(feature = "build")]
339pub trait DfirBuilder {
345 fn singleton_intermediates(&self) -> bool;
347
348 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
350
351 fn batch(
352 &mut self,
353 in_ident: syn::Ident,
354 in_location: &LocationId,
355 in_kind: &CollectionKind,
356 out_ident: &syn::Ident,
357 out_location: &LocationId,
358 op_meta: &HydroIrOpMetadata,
359 );
360 fn yield_from_tick(
361 &mut self,
362 in_ident: syn::Ident,
363 in_location: &LocationId,
364 in_kind: &CollectionKind,
365 out_ident: &syn::Ident,
366 out_location: &LocationId,
367 );
368
369 fn begin_atomic(
370 &mut self,
371 in_ident: syn::Ident,
372 in_location: &LocationId,
373 in_kind: &CollectionKind,
374 out_ident: &syn::Ident,
375 out_location: &LocationId,
376 op_meta: &HydroIrOpMetadata,
377 );
378 fn end_atomic(
379 &mut self,
380 in_ident: syn::Ident,
381 in_location: &LocationId,
382 in_kind: &CollectionKind,
383 out_ident: &syn::Ident,
384 );
385
386 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
387 fn observe_nondet(
388 &mut self,
389 trusted: bool,
390 location: &LocationId,
391 in_ident: syn::Ident,
392 in_kind: &CollectionKind,
393 out_ident: &syn::Ident,
394 out_kind: &CollectionKind,
395 op_meta: &HydroIrOpMetadata,
396 );
397
398 #[expect(clippy::too_many_arguments, reason = "TODO")]
399 fn create_network(
400 &mut self,
401 from: &LocationId,
402 to: &LocationId,
403 input_ident: syn::Ident,
404 out_ident: &syn::Ident,
405 serialize: Option<&DebugExpr>,
406 sink: syn::Expr,
407 source: syn::Expr,
408 deserialize: Option<&DebugExpr>,
409 tag_id: usize,
410 networking_info: &crate::networking::NetworkingInfo,
411 );
412
413 fn create_external_source(
414 &mut self,
415 on: &LocationId,
416 source_expr: syn::Expr,
417 out_ident: &syn::Ident,
418 deserialize: Option<&DebugExpr>,
419 tag_id: usize,
420 );
421
422 fn create_external_output(
423 &mut self,
424 on: &LocationId,
425 sink_expr: syn::Expr,
426 input_ident: &syn::Ident,
427 serialize: Option<&DebugExpr>,
428 tag_id: usize,
429 );
430}
431
432#[cfg(feature = "build")]
433impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
434 fn singleton_intermediates(&self) -> bool {
435 false
436 }
437
438 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
439 self.entry(location.root().key())
440 .expect("location was removed")
441 .or_default()
442 }
443
444 fn batch(
445 &mut self,
446 in_ident: syn::Ident,
447 in_location: &LocationId,
448 in_kind: &CollectionKind,
449 out_ident: &syn::Ident,
450 _out_location: &LocationId,
451 _op_meta: &HydroIrOpMetadata,
452 ) {
453 let builder = self.get_dfir_mut(in_location.root());
454 if in_kind.is_bounded()
455 && matches!(
456 in_kind,
457 CollectionKind::Singleton { .. }
458 | CollectionKind::Optional { .. }
459 | CollectionKind::KeyedSingleton { .. }
460 )
461 {
462 assert!(in_location.is_top_level());
463 builder.add_dfir(
464 parse_quote! {
465 #out_ident = #in_ident -> persist::<'static>();
466 },
467 None,
468 None,
469 );
470 } else {
471 builder.add_dfir(
472 parse_quote! {
473 #out_ident = #in_ident;
474 },
475 None,
476 None,
477 );
478 }
479 }
480
481 fn yield_from_tick(
482 &mut self,
483 in_ident: syn::Ident,
484 in_location: &LocationId,
485 _in_kind: &CollectionKind,
486 out_ident: &syn::Ident,
487 _out_location: &LocationId,
488 ) {
489 let builder = self.get_dfir_mut(in_location.root());
490 builder.add_dfir(
491 parse_quote! {
492 #out_ident = #in_ident;
493 },
494 None,
495 None,
496 );
497 }
498
499 fn begin_atomic(
500 &mut self,
501 in_ident: syn::Ident,
502 in_location: &LocationId,
503 _in_kind: &CollectionKind,
504 out_ident: &syn::Ident,
505 _out_location: &LocationId,
506 _op_meta: &HydroIrOpMetadata,
507 ) {
508 let builder = self.get_dfir_mut(in_location.root());
509 builder.add_dfir(
510 parse_quote! {
511 #out_ident = #in_ident;
512 },
513 None,
514 None,
515 );
516 }
517
518 fn end_atomic(
519 &mut self,
520 in_ident: syn::Ident,
521 in_location: &LocationId,
522 _in_kind: &CollectionKind,
523 out_ident: &syn::Ident,
524 ) {
525 let builder = self.get_dfir_mut(in_location.root());
526 builder.add_dfir(
527 parse_quote! {
528 #out_ident = #in_ident;
529 },
530 None,
531 None,
532 );
533 }
534
535 fn observe_nondet(
536 &mut self,
537 _trusted: bool,
538 location: &LocationId,
539 in_ident: syn::Ident,
540 _in_kind: &CollectionKind,
541 out_ident: &syn::Ident,
542 _out_kind: &CollectionKind,
543 _op_meta: &HydroIrOpMetadata,
544 ) {
545 let builder = self.get_dfir_mut(location);
546 builder.add_dfir(
547 parse_quote! {
548 #out_ident = #in_ident;
549 },
550 None,
551 None,
552 );
553 }
554
555 fn create_network(
556 &mut self,
557 from: &LocationId,
558 to: &LocationId,
559 input_ident: syn::Ident,
560 out_ident: &syn::Ident,
561 serialize: Option<&DebugExpr>,
562 sink: syn::Expr,
563 source: syn::Expr,
564 deserialize: Option<&DebugExpr>,
565 tag_id: usize,
566 _networking_info: &crate::networking::NetworkingInfo,
567 ) {
568 let sender_builder = self.get_dfir_mut(from);
569 if let Some(serialize_pipeline) = serialize {
570 sender_builder.add_dfir(
571 parse_quote! {
572 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
573 },
574 None,
575 Some(&format!("send{}", tag_id)),
577 );
578 } else {
579 sender_builder.add_dfir(
580 parse_quote! {
581 #input_ident -> dest_sink(#sink);
582 },
583 None,
584 Some(&format!("send{}", tag_id)),
585 );
586 }
587
588 let receiver_builder = self.get_dfir_mut(to);
589 if let Some(deserialize_pipeline) = deserialize {
590 receiver_builder.add_dfir(
591 parse_quote! {
592 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
593 },
594 None,
595 Some(&format!("recv{}", tag_id)),
596 );
597 } else {
598 receiver_builder.add_dfir(
599 parse_quote! {
600 #out_ident = source_stream(#source);
601 },
602 None,
603 Some(&format!("recv{}", tag_id)),
604 );
605 }
606 }
607
608 fn create_external_source(
609 &mut self,
610 on: &LocationId,
611 source_expr: syn::Expr,
612 out_ident: &syn::Ident,
613 deserialize: Option<&DebugExpr>,
614 tag_id: usize,
615 ) {
616 let receiver_builder = self.get_dfir_mut(on);
617 if let Some(deserialize_pipeline) = deserialize {
618 receiver_builder.add_dfir(
619 parse_quote! {
620 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
621 },
622 None,
623 Some(&format!("recv{}", tag_id)),
624 );
625 } else {
626 receiver_builder.add_dfir(
627 parse_quote! {
628 #out_ident = source_stream(#source_expr);
629 },
630 None,
631 Some(&format!("recv{}", tag_id)),
632 );
633 }
634 }
635
636 fn create_external_output(
637 &mut self,
638 on: &LocationId,
639 sink_expr: syn::Expr,
640 input_ident: &syn::Ident,
641 serialize: Option<&DebugExpr>,
642 tag_id: usize,
643 ) {
644 let sender_builder = self.get_dfir_mut(on);
645 if let Some(serialize_fn) = serialize {
646 sender_builder.add_dfir(
647 parse_quote! {
648 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
649 },
650 None,
651 Some(&format!("send{}", tag_id)),
653 );
654 } else {
655 sender_builder.add_dfir(
656 parse_quote! {
657 #input_ident -> dest_sink(#sink_expr);
658 },
659 None,
660 Some(&format!("send{}", tag_id)),
661 );
662 }
663 }
664}
665
666#[cfg(feature = "build")]
667pub enum BuildersOrCallback<'a, L, N>
668where
669 L: FnMut(&mut HydroRoot, &mut usize),
670 N: FnMut(&mut HydroNode, &mut usize),
671{
672 Builders(&'a mut dyn DfirBuilder),
673 Callback(L, N),
674}
675
676#[derive(Debug, Hash)]
680pub enum HydroRoot {
681 ForEach {
682 f: DebugExpr,
683 input: Box<HydroNode>,
684 op_metadata: HydroIrOpMetadata,
685 },
686 SendExternal {
687 to_external_key: LocationKey,
688 to_port_id: ExternalPortId,
689 to_many: bool,
690 unpaired: bool,
691 serialize_fn: Option<DebugExpr>,
692 instantiate_fn: DebugInstantiate,
693 input: Box<HydroNode>,
694 op_metadata: HydroIrOpMetadata,
695 },
696 DestSink {
697 sink: DebugExpr,
698 input: Box<HydroNode>,
699 op_metadata: HydroIrOpMetadata,
700 },
701 CycleSink {
702 cycle_id: CycleId,
703 input: Box<HydroNode>,
704 op_metadata: HydroIrOpMetadata,
705 },
706 EmbeddedOutput {
707 ident: syn::Ident,
708 input: Box<HydroNode>,
709 op_metadata: HydroIrOpMetadata,
710 },
711 Null {
712 input: Box<HydroNode>,
713 op_metadata: HydroIrOpMetadata,
714 },
715}
716
717impl HydroRoot {
718 #[cfg(feature = "build")]
719 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
720 pub fn compile_network<'a, D>(
721 &mut self,
722 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
723 seen_tees: &mut SeenSharedNodes,
724 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
725 processes: &SparseSecondaryMap<LocationKey, D::Process>,
726 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
727 externals: &SparseSecondaryMap<LocationKey, D::External>,
728 env: &mut D::InstantiateEnv,
729 ) where
730 D: Deploy<'a>,
731 {
732 let refcell_extra_stmts = RefCell::new(extra_stmts);
733 let refcell_env = RefCell::new(env);
734 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
735 self.transform_bottom_up(
736 &mut |l| {
737 if let HydroRoot::SendExternal {
738 input,
739 to_external_key,
740 to_port_id,
741 to_many,
742 unpaired,
743 instantiate_fn,
744 ..
745 } = l
746 {
747 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
748 DebugInstantiate::Building => {
749 let to_node = externals
750 .get(*to_external_key)
751 .unwrap_or_else(|| {
752 panic!("A external used in the graph was not instantiated: {}", to_external_key)
753 })
754 .clone();
755
756 match input.metadata().location_id.root() {
757 &LocationId::Process(process_key) => {
758 if *to_many {
759 (
760 (
761 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
762 parse_quote!(DUMMY),
763 ),
764 Box::new(|| {}) as Box<dyn FnOnce()>,
765 )
766 } else {
767 let from_node = processes
768 .get(process_key)
769 .unwrap_or_else(|| {
770 panic!("A process used in the graph was not instantiated: {}", process_key)
771 })
772 .clone();
773
774 let sink_port = from_node.next_port();
775 let source_port = to_node.next_port();
776
777 if *unpaired {
778 use stageleft::quote_type;
779 use tokio_util::codec::LengthDelimitedCodec;
780
781 to_node.register(*to_port_id, source_port.clone());
782
783 let _ = D::e2o_source(
784 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
785 &to_node, &source_port,
786 &from_node, &sink_port,
787 "e_type::<LengthDelimitedCodec>(),
788 format!("{}_{}", *to_external_key, *to_port_id)
789 );
790 }
791
792 (
793 (
794 D::o2e_sink(
795 &from_node,
796 &sink_port,
797 &to_node,
798 &source_port,
799 format!("{}_{}", *to_external_key, *to_port_id)
800 ),
801 parse_quote!(DUMMY),
802 ),
803 if *unpaired {
804 D::e2o_connect(
805 &to_node,
806 &source_port,
807 &from_node,
808 &sink_port,
809 *to_many,
810 NetworkHint::Auto,
811 )
812 } else {
813 Box::new(|| {}) as Box<dyn FnOnce()>
814 },
815 )
816 }
817 }
818 LocationId::Cluster(cluster_key) => {
819 let from_node = clusters
820 .get(*cluster_key)
821 .unwrap_or_else(|| {
822 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
823 })
824 .clone();
825
826 let sink_port = from_node.next_port();
827 let source_port = to_node.next_port();
828
829 if *unpaired {
830 to_node.register(*to_port_id, source_port.clone());
831 }
832
833 (
834 (
835 D::m2e_sink(
836 &from_node,
837 &sink_port,
838 &to_node,
839 &source_port,
840 format!("{}_{}", *to_external_key, *to_port_id)
841 ),
842 parse_quote!(DUMMY),
843 ),
844 Box::new(|| {}) as Box<dyn FnOnce()>,
845 )
846 }
847 _ => panic!()
848 }
849 },
850
851 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
852 };
853
854 *instantiate_fn = DebugInstantiateFinalized {
855 sink: sink_expr,
856 source: source_expr,
857 connect_fn: Some(connect_fn),
858 }
859 .into();
860 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
861 let element_type = match &input.metadata().collection_kind {
862 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
863 _ => panic!("Embedded output must have Stream collection kind"),
864 };
865 let location_key = match input.metadata().location_id.root() {
866 LocationId::Process(key) | LocationId::Cluster(key) => *key,
867 _ => panic!("Embedded output must be on a process or cluster"),
868 };
869 D::register_embedded_output(
870 &mut refcell_env.borrow_mut(),
871 location_key,
872 ident,
873 &element_type,
874 );
875 }
876 },
877 &mut |n| {
878 if let HydroNode::Network {
879 name,
880 networking_info,
881 input,
882 instantiate_fn,
883 metadata,
884 ..
885 } = n
886 {
887 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
888 DebugInstantiate::Building => instantiate_network::<D>(
889 &mut refcell_env.borrow_mut(),
890 input.metadata().location_id.root(),
891 metadata.location_id.root(),
892 processes,
893 clusters,
894 name.as_deref(),
895 networking_info,
896 ),
897
898 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
899 };
900
901 *instantiate_fn = DebugInstantiateFinalized {
902 sink: sink_expr,
903 source: source_expr,
904 connect_fn: Some(connect_fn),
905 }
906 .into();
907 } else if let HydroNode::ExternalInput {
908 from_external_key,
909 from_port_id,
910 from_many,
911 codec_type,
912 port_hint,
913 instantiate_fn,
914 metadata,
915 ..
916 } = n
917 {
918 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
919 DebugInstantiate::Building => {
920 let from_node = externals
921 .get(*from_external_key)
922 .unwrap_or_else(|| {
923 panic!(
924 "A external used in the graph was not instantiated: {}",
925 from_external_key,
926 )
927 })
928 .clone();
929
930 match metadata.location_id.root() {
931 &LocationId::Process(process_key) => {
932 let to_node = processes
933 .get(process_key)
934 .unwrap_or_else(|| {
935 panic!("A process used in the graph was not instantiated: {}", process_key)
936 })
937 .clone();
938
939 let sink_port = from_node.next_port();
940 let source_port = to_node.next_port();
941
942 from_node.register(*from_port_id, sink_port.clone());
943
944 (
945 (
946 parse_quote!(DUMMY),
947 if *from_many {
948 D::e2o_many_source(
949 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
950 &to_node, &source_port,
951 codec_type.0.as_ref(),
952 format!("{}_{}", *from_external_key, *from_port_id)
953 )
954 } else {
955 D::e2o_source(
956 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
957 &from_node, &sink_port,
958 &to_node, &source_port,
959 codec_type.0.as_ref(),
960 format!("{}_{}", *from_external_key, *from_port_id)
961 )
962 },
963 ),
964 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
965 )
966 }
967 LocationId::Cluster(cluster_key) => {
968 let to_node = clusters
969 .get(*cluster_key)
970 .unwrap_or_else(|| {
971 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
972 })
973 .clone();
974
975 let sink_port = from_node.next_port();
976 let source_port = to_node.next_port();
977
978 from_node.register(*from_port_id, sink_port.clone());
979
980 (
981 (
982 parse_quote!(DUMMY),
983 D::e2m_source(
984 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
985 &from_node, &sink_port,
986 &to_node, &source_port,
987 codec_type.0.as_ref(),
988 format!("{}_{}", *from_external_key, *from_port_id)
989 ),
990 ),
991 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
992 )
993 }
994 _ => panic!()
995 }
996 },
997
998 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
999 };
1000
1001 *instantiate_fn = DebugInstantiateFinalized {
1002 sink: sink_expr,
1003 source: source_expr,
1004 connect_fn: Some(connect_fn),
1005 }
1006 .into();
1007 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1008 let element_type = match &metadata.collection_kind {
1009 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1010 _ => panic!("Embedded source must have Stream collection kind"),
1011 };
1012 let location_key = match metadata.location_id.root() {
1013 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1014 _ => panic!("Embedded source must be on a process or cluster"),
1015 };
1016 D::register_embedded_stream_input(
1017 &mut refcell_env.borrow_mut(),
1018 location_key,
1019 ident,
1020 &element_type,
1021 );
1022 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1023 let element_type = match &metadata.collection_kind {
1024 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1025 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1026 };
1027 let location_key = match metadata.location_id.root() {
1028 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1029 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1030 };
1031 D::register_embedded_singleton_input(
1032 &mut refcell_env.borrow_mut(),
1033 location_key,
1034 ident,
1035 &element_type,
1036 );
1037 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1038 match state {
1039 ClusterMembersState::Uninit => {
1040 let at_location = metadata.location_id.root().clone();
1041 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
1042 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1043 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1045 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1046 &(),
1047 );
1048 *state = ClusterMembersState::Stream(expr.into());
1049 } else {
1050 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1052 }
1053 }
1054 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1055 panic!("cluster members already finalized");
1056 }
1057 }
1058 }
1059 },
1060 seen_tees,
1061 false,
1062 );
1063 }
1064
1065 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1066 self.transform_bottom_up(
1067 &mut |l| {
1068 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1069 match instantiate_fn {
1070 DebugInstantiate::Building => panic!("network not built"),
1071
1072 DebugInstantiate::Finalized(finalized) => {
1073 (finalized.connect_fn.take().unwrap())();
1074 }
1075 }
1076 }
1077 },
1078 &mut |n| {
1079 if let HydroNode::Network { instantiate_fn, .. }
1080 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1081 {
1082 match instantiate_fn {
1083 DebugInstantiate::Building => panic!("network not built"),
1084
1085 DebugInstantiate::Finalized(finalized) => {
1086 (finalized.connect_fn.take().unwrap())();
1087 }
1088 }
1089 }
1090 },
1091 seen_tees,
1092 false,
1093 );
1094 }
1095
1096 pub fn transform_bottom_up(
1097 &mut self,
1098 transform_root: &mut impl FnMut(&mut HydroRoot),
1099 transform_node: &mut impl FnMut(&mut HydroNode),
1100 seen_tees: &mut SeenSharedNodes,
1101 check_well_formed: bool,
1102 ) {
1103 self.transform_children(
1104 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1105 seen_tees,
1106 );
1107
1108 transform_root(self);
1109 }
1110
1111 pub fn transform_children(
1112 &mut self,
1113 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1114 seen_tees: &mut SeenSharedNodes,
1115 ) {
1116 match self {
1117 HydroRoot::ForEach { input, .. }
1118 | HydroRoot::SendExternal { input, .. }
1119 | HydroRoot::DestSink { input, .. }
1120 | HydroRoot::CycleSink { input, .. }
1121 | HydroRoot::EmbeddedOutput { input, .. }
1122 | HydroRoot::Null { input, .. } => {
1123 transform(input, seen_tees);
1124 }
1125 }
1126 }
1127
1128 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1129 match self {
1130 HydroRoot::ForEach {
1131 f,
1132 input,
1133 op_metadata,
1134 } => HydroRoot::ForEach {
1135 f: f.clone(),
1136 input: Box::new(input.deep_clone(seen_tees)),
1137 op_metadata: op_metadata.clone(),
1138 },
1139 HydroRoot::SendExternal {
1140 to_external_key,
1141 to_port_id,
1142 to_many,
1143 unpaired,
1144 serialize_fn,
1145 instantiate_fn,
1146 input,
1147 op_metadata,
1148 } => HydroRoot::SendExternal {
1149 to_external_key: *to_external_key,
1150 to_port_id: *to_port_id,
1151 to_many: *to_many,
1152 unpaired: *unpaired,
1153 serialize_fn: serialize_fn.clone(),
1154 instantiate_fn: instantiate_fn.clone(),
1155 input: Box::new(input.deep_clone(seen_tees)),
1156 op_metadata: op_metadata.clone(),
1157 },
1158 HydroRoot::DestSink {
1159 sink,
1160 input,
1161 op_metadata,
1162 } => HydroRoot::DestSink {
1163 sink: sink.clone(),
1164 input: Box::new(input.deep_clone(seen_tees)),
1165 op_metadata: op_metadata.clone(),
1166 },
1167 HydroRoot::CycleSink {
1168 cycle_id,
1169 input,
1170 op_metadata,
1171 } => HydroRoot::CycleSink {
1172 cycle_id: *cycle_id,
1173 input: Box::new(input.deep_clone(seen_tees)),
1174 op_metadata: op_metadata.clone(),
1175 },
1176 HydroRoot::EmbeddedOutput {
1177 ident,
1178 input,
1179 op_metadata,
1180 } => HydroRoot::EmbeddedOutput {
1181 ident: ident.clone(),
1182 input: Box::new(input.deep_clone(seen_tees)),
1183 op_metadata: op_metadata.clone(),
1184 },
1185 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1186 input: Box::new(input.deep_clone(seen_tees)),
1187 op_metadata: op_metadata.clone(),
1188 },
1189 }
1190 }
1191
1192 #[cfg(feature = "build")]
1193 pub fn emit(
1194 &mut self,
1195 graph_builders: &mut dyn DfirBuilder,
1196 seen_tees: &mut SeenSharedNodes,
1197 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1198 next_stmt_id: &mut usize,
1199 ) {
1200 self.emit_core(
1201 &mut BuildersOrCallback::<
1202 fn(&mut HydroRoot, &mut usize),
1203 fn(&mut HydroNode, &mut usize),
1204 >::Builders(graph_builders),
1205 seen_tees,
1206 built_tees,
1207 next_stmt_id,
1208 );
1209 }
1210
1211 #[cfg(feature = "build")]
1212 pub fn emit_core(
1213 &mut self,
1214 builders_or_callback: &mut BuildersOrCallback<
1215 impl FnMut(&mut HydroRoot, &mut usize),
1216 impl FnMut(&mut HydroNode, &mut usize),
1217 >,
1218 seen_tees: &mut SeenSharedNodes,
1219 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1220 next_stmt_id: &mut usize,
1221 ) {
1222 match self {
1223 HydroRoot::ForEach { f, input, .. } => {
1224 let input_ident =
1225 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1226
1227 match builders_or_callback {
1228 BuildersOrCallback::Builders(graph_builders) => {
1229 graph_builders
1230 .get_dfir_mut(&input.metadata().location_id)
1231 .add_dfir(
1232 parse_quote! {
1233 #input_ident -> for_each(#f);
1234 },
1235 None,
1236 Some(&next_stmt_id.to_string()),
1237 );
1238 }
1239 BuildersOrCallback::Callback(leaf_callback, _) => {
1240 leaf_callback(self, next_stmt_id);
1241 }
1242 }
1243
1244 *next_stmt_id += 1;
1245 }
1246
1247 HydroRoot::SendExternal {
1248 serialize_fn,
1249 instantiate_fn,
1250 input,
1251 ..
1252 } => {
1253 let input_ident =
1254 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1255
1256 match builders_or_callback {
1257 BuildersOrCallback::Builders(graph_builders) => {
1258 let (sink_expr, _) = match instantiate_fn {
1259 DebugInstantiate::Building => (
1260 syn::parse_quote!(DUMMY_SINK),
1261 syn::parse_quote!(DUMMY_SOURCE),
1262 ),
1263
1264 DebugInstantiate::Finalized(finalized) => {
1265 (finalized.sink.clone(), finalized.source.clone())
1266 }
1267 };
1268
1269 graph_builders.create_external_output(
1270 &input.metadata().location_id,
1271 sink_expr,
1272 &input_ident,
1273 serialize_fn.as_ref(),
1274 *next_stmt_id,
1275 );
1276 }
1277 BuildersOrCallback::Callback(leaf_callback, _) => {
1278 leaf_callback(self, next_stmt_id);
1279 }
1280 }
1281
1282 *next_stmt_id += 1;
1283 }
1284
1285 HydroRoot::DestSink { sink, input, .. } => {
1286 let input_ident =
1287 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1288
1289 match builders_or_callback {
1290 BuildersOrCallback::Builders(graph_builders) => {
1291 graph_builders
1292 .get_dfir_mut(&input.metadata().location_id)
1293 .add_dfir(
1294 parse_quote! {
1295 #input_ident -> dest_sink(#sink);
1296 },
1297 None,
1298 Some(&next_stmt_id.to_string()),
1299 );
1300 }
1301 BuildersOrCallback::Callback(leaf_callback, _) => {
1302 leaf_callback(self, next_stmt_id);
1303 }
1304 }
1305
1306 *next_stmt_id += 1;
1307 }
1308
1309 HydroRoot::CycleSink {
1310 cycle_id, input, ..
1311 } => {
1312 let input_ident =
1313 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1314
1315 match builders_or_callback {
1316 BuildersOrCallback::Builders(graph_builders) => {
1317 let elem_type: syn::Type = match &input.metadata().collection_kind {
1318 CollectionKind::KeyedSingleton {
1319 key_type,
1320 value_type,
1321 ..
1322 }
1323 | CollectionKind::KeyedStream {
1324 key_type,
1325 value_type,
1326 ..
1327 } => {
1328 parse_quote!((#key_type, #value_type))
1329 }
1330 CollectionKind::Stream { element_type, .. }
1331 | CollectionKind::Singleton { element_type, .. }
1332 | CollectionKind::Optional { element_type, .. } => {
1333 parse_quote!(#element_type)
1334 }
1335 };
1336
1337 let cycle_id_ident = cycle_id.as_ident();
1338 graph_builders
1339 .get_dfir_mut(&input.metadata().location_id)
1340 .add_dfir(
1341 parse_quote! {
1342 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1343 },
1344 None,
1345 None,
1346 );
1347 }
1348 BuildersOrCallback::Callback(_, _) => {}
1350 }
1351 }
1352
1353 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1354 let input_ident =
1355 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1356
1357 match builders_or_callback {
1358 BuildersOrCallback::Builders(graph_builders) => {
1359 graph_builders
1360 .get_dfir_mut(&input.metadata().location_id)
1361 .add_dfir(
1362 parse_quote! {
1363 #input_ident -> for_each(&mut #ident);
1364 },
1365 None,
1366 Some(&next_stmt_id.to_string()),
1367 );
1368 }
1369 BuildersOrCallback::Callback(leaf_callback, _) => {
1370 leaf_callback(self, next_stmt_id);
1371 }
1372 }
1373
1374 *next_stmt_id += 1;
1375 }
1376
1377 HydroRoot::Null { input, .. } => {
1378 let input_ident =
1379 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1380
1381 match builders_or_callback {
1382 BuildersOrCallback::Builders(graph_builders) => {
1383 graph_builders
1384 .get_dfir_mut(&input.metadata().location_id)
1385 .add_dfir(
1386 parse_quote! {
1387 #input_ident -> for_each(|_| {});
1388 },
1389 None,
1390 Some(&next_stmt_id.to_string()),
1391 );
1392 }
1393 BuildersOrCallback::Callback(leaf_callback, _) => {
1394 leaf_callback(self, next_stmt_id);
1395 }
1396 }
1397
1398 *next_stmt_id += 1;
1399 }
1400 }
1401 }
1402
1403 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1404 match self {
1405 HydroRoot::ForEach { op_metadata, .. }
1406 | HydroRoot::SendExternal { op_metadata, .. }
1407 | HydroRoot::DestSink { op_metadata, .. }
1408 | HydroRoot::CycleSink { op_metadata, .. }
1409 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1410 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1411 }
1412 }
1413
1414 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1415 match self {
1416 HydroRoot::ForEach { op_metadata, .. }
1417 | HydroRoot::SendExternal { op_metadata, .. }
1418 | HydroRoot::DestSink { op_metadata, .. }
1419 | HydroRoot::CycleSink { op_metadata, .. }
1420 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1421 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1422 }
1423 }
1424
1425 pub fn input(&self) -> &HydroNode {
1426 match self {
1427 HydroRoot::ForEach { input, .. }
1428 | HydroRoot::SendExternal { input, .. }
1429 | HydroRoot::DestSink { input, .. }
1430 | HydroRoot::CycleSink { input, .. }
1431 | HydroRoot::EmbeddedOutput { input, .. }
1432 | HydroRoot::Null { input, .. } => input,
1433 }
1434 }
1435
1436 pub fn input_metadata(&self) -> &HydroIrMetadata {
1437 self.input().metadata()
1438 }
1439
1440 pub fn print_root(&self) -> String {
1441 match self {
1442 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1443 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1444 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1445 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1446 HydroRoot::EmbeddedOutput { ident, .. } => {
1447 format!("EmbeddedOutput({})", ident)
1448 }
1449 HydroRoot::Null { .. } => "Null".to_owned(),
1450 }
1451 }
1452
1453 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1454 match self {
1455 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1456 transform(f);
1457 }
1458 HydroRoot::SendExternal { .. }
1459 | HydroRoot::CycleSink { .. }
1460 | HydroRoot::EmbeddedOutput { .. }
1461 | HydroRoot::Null { .. } => {}
1462 }
1463 }
1464}
1465
1466#[cfg(feature = "build")]
1467fn tick_of(loc: &LocationId) -> Option<ClockId> {
1468 match loc {
1469 LocationId::Tick(id, _) => Some(*id),
1470 LocationId::Atomic(inner) => tick_of(inner),
1471 _ => None,
1472 }
1473}
1474
1475#[cfg(feature = "build")]
1476fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1477 match loc {
1478 LocationId::Tick(id, inner) => {
1479 *id = uf_find(uf, *id);
1480 remap_location(inner, uf);
1481 }
1482 LocationId::Atomic(inner) => {
1483 remap_location(inner, uf);
1484 }
1485 LocationId::Process(_) | LocationId::Cluster(_) => {}
1486 }
1487}
1488
1489#[cfg(feature = "build")]
1490fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1491 let p = *parent.get(&x).unwrap_or(&x);
1492 if p == x {
1493 return x;
1494 }
1495 let root = uf_find(parent, p);
1496 parent.insert(x, root);
1497 root
1498}
1499
1500#[cfg(feature = "build")]
1501fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1502 let ra = uf_find(parent, a);
1503 let rb = uf_find(parent, b);
1504 if ra != rb {
1505 parent.insert(ra, rb);
1506 }
1507}
1508
1509#[cfg(feature = "build")]
1513pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1514 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1515
1516 transform_bottom_up(
1518 ir,
1519 &mut |_| {},
1520 &mut |node: &mut HydroNode| {
1521 if let HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } =
1522 node
1523 && let (Some(a), Some(b)) = (
1524 tick_of(&inner.metadata().location_id),
1525 tick_of(&metadata.location_id),
1526 )
1527 {
1528 uf_union(&mut uf, a, b);
1529 }
1530 },
1531 false,
1532 );
1533
1534 transform_bottom_up(
1536 ir,
1537 &mut |_| {},
1538 &mut |node: &mut HydroNode| {
1539 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1540 },
1541 false,
1542 );
1543}
1544
1545#[cfg(feature = "build")]
1546pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1547 let mut builders = SecondaryMap::new();
1548 let mut seen_tees = HashMap::new();
1549 let mut built_tees = HashMap::new();
1550 let mut next_stmt_id = 0;
1551 for leaf in ir {
1552 leaf.emit(
1553 &mut builders,
1554 &mut seen_tees,
1555 &mut built_tees,
1556 &mut next_stmt_id,
1557 );
1558 }
1559 builders
1560}
1561
1562#[cfg(feature = "build")]
1563pub fn traverse_dfir(
1564 ir: &mut [HydroRoot],
1565 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1566 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1567) {
1568 let mut seen_tees = HashMap::new();
1569 let mut built_tees = HashMap::new();
1570 let mut next_stmt_id = 0;
1571 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1572 ir.iter_mut().for_each(|leaf| {
1573 leaf.emit_core(
1574 &mut callback,
1575 &mut seen_tees,
1576 &mut built_tees,
1577 &mut next_stmt_id,
1578 );
1579 });
1580}
1581
1582pub fn transform_bottom_up(
1583 ir: &mut [HydroRoot],
1584 transform_root: &mut impl FnMut(&mut HydroRoot),
1585 transform_node: &mut impl FnMut(&mut HydroNode),
1586 check_well_formed: bool,
1587) {
1588 let mut seen_tees = HashMap::new();
1589 ir.iter_mut().for_each(|leaf| {
1590 leaf.transform_bottom_up(
1591 transform_root,
1592 transform_node,
1593 &mut seen_tees,
1594 check_well_formed,
1595 );
1596 });
1597}
1598
1599pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1600 let mut seen_tees = HashMap::new();
1601 ir.iter()
1602 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1603 .collect()
1604}
1605
1606type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1607thread_local! {
1608 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1609}
1610
1611pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1612 PRINTED_TEES.with(|printed_tees| {
1613 let mut printed_tees_mut = printed_tees.borrow_mut();
1614 *printed_tees_mut = Some((0, HashMap::new()));
1615 drop(printed_tees_mut);
1616
1617 let ret = f();
1618
1619 let mut printed_tees_mut = printed_tees.borrow_mut();
1620 *printed_tees_mut = None;
1621
1622 ret
1623 })
1624}
1625
1626pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
1627
1628impl SharedNode {
1629 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1630 Rc::as_ptr(&self.0)
1631 }
1632}
1633
1634impl Debug for SharedNode {
1635 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1636 PRINTED_TEES.with(|printed_tees| {
1637 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1638 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1639
1640 if let Some(printed_tees_mut) = printed_tees_mut {
1641 if let Some(existing) = printed_tees_mut
1642 .1
1643 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1644 {
1645 write!(f, "<shared {}>", existing)
1646 } else {
1647 let next_id = printed_tees_mut.0;
1648 printed_tees_mut.0 += 1;
1649 printed_tees_mut
1650 .1
1651 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1652 drop(printed_tees_mut_borrow);
1653 write!(f, "<shared {}>: ", next_id)?;
1654 Debug::fmt(&self.0.borrow(), f)
1655 }
1656 } else {
1657 drop(printed_tees_mut_borrow);
1658 write!(f, "<shared>: ")?;
1659 Debug::fmt(&self.0.borrow(), f)
1660 }
1661 })
1662 }
1663}
1664
1665impl Hash for SharedNode {
1666 fn hash<H: Hasher>(&self, state: &mut H) {
1667 self.0.borrow_mut().hash(state);
1668 }
1669}
1670
1671#[derive(Clone, PartialEq, Eq, Debug)]
1672pub enum BoundKind {
1673 Unbounded,
1674 Bounded,
1675}
1676
1677#[derive(Clone, PartialEq, Eq, Debug)]
1678pub enum StreamOrder {
1679 NoOrder,
1680 TotalOrder,
1681}
1682
1683#[derive(Clone, PartialEq, Eq, Debug)]
1684pub enum StreamRetry {
1685 AtLeastOnce,
1686 ExactlyOnce,
1687}
1688
1689#[derive(Clone, PartialEq, Eq, Debug)]
1690pub enum KeyedSingletonBoundKind {
1691 Unbounded,
1692 MonotonicValue,
1693 BoundedValue,
1694 Bounded,
1695}
1696
1697#[derive(Clone, PartialEq, Eq, Debug)]
1698pub enum SingletonBoundKind {
1699 Unbounded,
1700 Monotonic,
1701 Bounded,
1702}
1703
1704#[derive(Clone, PartialEq, Eq, Debug)]
1705pub enum CollectionKind {
1706 Stream {
1707 bound: BoundKind,
1708 order: StreamOrder,
1709 retry: StreamRetry,
1710 element_type: DebugType,
1711 },
1712 Singleton {
1713 bound: SingletonBoundKind,
1714 element_type: DebugType,
1715 },
1716 Optional {
1717 bound: BoundKind,
1718 element_type: DebugType,
1719 },
1720 KeyedStream {
1721 bound: BoundKind,
1722 value_order: StreamOrder,
1723 value_retry: StreamRetry,
1724 key_type: DebugType,
1725 value_type: DebugType,
1726 },
1727 KeyedSingleton {
1728 bound: KeyedSingletonBoundKind,
1729 key_type: DebugType,
1730 value_type: DebugType,
1731 },
1732}
1733
1734impl CollectionKind {
1735 pub fn is_bounded(&self) -> bool {
1736 matches!(
1737 self,
1738 CollectionKind::Stream {
1739 bound: BoundKind::Bounded,
1740 ..
1741 } | CollectionKind::Singleton {
1742 bound: SingletonBoundKind::Bounded,
1743 ..
1744 } | CollectionKind::Optional {
1745 bound: BoundKind::Bounded,
1746 ..
1747 } | CollectionKind::KeyedStream {
1748 bound: BoundKind::Bounded,
1749 ..
1750 } | CollectionKind::KeyedSingleton {
1751 bound: KeyedSingletonBoundKind::Bounded,
1752 ..
1753 }
1754 )
1755 }
1756}
1757
1758#[derive(Clone)]
1759pub struct HydroIrMetadata {
1760 pub location_id: LocationId,
1761 pub collection_kind: CollectionKind,
1762 pub cardinality: Option<usize>,
1763 pub tag: Option<String>,
1764 pub op: HydroIrOpMetadata,
1765}
1766
1767impl Hash for HydroIrMetadata {
1769 fn hash<H: Hasher>(&self, _: &mut H) {}
1770}
1771
1772impl PartialEq for HydroIrMetadata {
1773 fn eq(&self, _: &Self) -> bool {
1774 true
1775 }
1776}
1777
1778impl Eq for HydroIrMetadata {}
1779
1780impl Debug for HydroIrMetadata {
1781 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1782 f.debug_struct("HydroIrMetadata")
1783 .field("location_id", &self.location_id)
1784 .field("collection_kind", &self.collection_kind)
1785 .finish()
1786 }
1787}
1788
1789#[derive(Clone)]
1792pub struct HydroIrOpMetadata {
1793 pub backtrace: Backtrace,
1794 pub cpu_usage: Option<f64>,
1795 pub network_recv_cpu_usage: Option<f64>,
1796 pub id: Option<usize>,
1797}
1798
1799impl HydroIrOpMetadata {
1800 #[expect(
1801 clippy::new_without_default,
1802 reason = "explicit calls to new ensure correct backtrace bounds"
1803 )]
1804 pub fn new() -> HydroIrOpMetadata {
1805 Self::new_with_skip(1)
1806 }
1807
1808 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1809 HydroIrOpMetadata {
1810 backtrace: Backtrace::get_backtrace(2 + skip_count),
1811 cpu_usage: None,
1812 network_recv_cpu_usage: None,
1813 id: None,
1814 }
1815 }
1816}
1817
1818impl Debug for HydroIrOpMetadata {
1819 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1820 f.debug_struct("HydroIrOpMetadata").finish()
1821 }
1822}
1823
1824impl Hash for HydroIrOpMetadata {
1825 fn hash<H: Hasher>(&self, _: &mut H) {}
1826}
1827
1828#[derive(Debug, Hash)]
1831pub enum HydroNode {
1832 Placeholder,
1833
1834 Cast {
1842 inner: Box<HydroNode>,
1843 metadata: HydroIrMetadata,
1844 },
1845
1846 ObserveNonDet {
1852 inner: Box<HydroNode>,
1853 trusted: bool, metadata: HydroIrMetadata,
1855 },
1856
1857 Source {
1858 source: HydroSource,
1859 metadata: HydroIrMetadata,
1860 },
1861
1862 SingletonSource {
1863 value: DebugExpr,
1864 first_tick_only: bool,
1865 metadata: HydroIrMetadata,
1866 },
1867
1868 CycleSource {
1869 cycle_id: CycleId,
1870 metadata: HydroIrMetadata,
1871 },
1872
1873 Tee {
1874 inner: SharedNode,
1875 metadata: HydroIrMetadata,
1876 },
1877
1878 Partition {
1879 inner: SharedNode,
1880 f: DebugExpr,
1881 is_true: bool,
1882 metadata: HydroIrMetadata,
1883 },
1884
1885 BeginAtomic {
1886 inner: Box<HydroNode>,
1887 metadata: HydroIrMetadata,
1888 },
1889
1890 EndAtomic {
1891 inner: Box<HydroNode>,
1892 metadata: HydroIrMetadata,
1893 },
1894
1895 Batch {
1896 inner: Box<HydroNode>,
1897 metadata: HydroIrMetadata,
1898 },
1899
1900 YieldConcat {
1901 inner: Box<HydroNode>,
1902 metadata: HydroIrMetadata,
1903 },
1904
1905 Chain {
1906 first: Box<HydroNode>,
1907 second: Box<HydroNode>,
1908 metadata: HydroIrMetadata,
1909 },
1910
1911 ChainFirst {
1912 first: Box<HydroNode>,
1913 second: Box<HydroNode>,
1914 metadata: HydroIrMetadata,
1915 },
1916
1917 CrossProduct {
1918 left: Box<HydroNode>,
1919 right: Box<HydroNode>,
1920 metadata: HydroIrMetadata,
1921 },
1922
1923 CrossSingleton {
1924 left: Box<HydroNode>,
1925 right: Box<HydroNode>,
1926 metadata: HydroIrMetadata,
1927 },
1928
1929 Join {
1930 left: Box<HydroNode>,
1931 right: Box<HydroNode>,
1932 metadata: HydroIrMetadata,
1933 },
1934
1935 Difference {
1936 pos: Box<HydroNode>,
1937 neg: Box<HydroNode>,
1938 metadata: HydroIrMetadata,
1939 },
1940
1941 AntiJoin {
1942 pos: Box<HydroNode>,
1943 neg: Box<HydroNode>,
1944 metadata: HydroIrMetadata,
1945 },
1946
1947 ResolveFutures {
1948 input: Box<HydroNode>,
1949 metadata: HydroIrMetadata,
1950 },
1951 ResolveFuturesBlocking {
1952 input: Box<HydroNode>,
1953 metadata: HydroIrMetadata,
1954 },
1955 ResolveFuturesOrdered {
1956 input: Box<HydroNode>,
1957 metadata: HydroIrMetadata,
1958 },
1959
1960 Map {
1961 f: DebugExpr,
1962 input: Box<HydroNode>,
1963 metadata: HydroIrMetadata,
1964 },
1965 FlatMap {
1966 f: DebugExpr,
1967 input: Box<HydroNode>,
1968 metadata: HydroIrMetadata,
1969 },
1970 FlatMapStreamBlocking {
1971 f: DebugExpr,
1972 input: Box<HydroNode>,
1973 metadata: HydroIrMetadata,
1974 },
1975 Filter {
1976 f: DebugExpr,
1977 input: Box<HydroNode>,
1978 metadata: HydroIrMetadata,
1979 },
1980 FilterMap {
1981 f: DebugExpr,
1982 input: Box<HydroNode>,
1983 metadata: HydroIrMetadata,
1984 },
1985
1986 DeferTick {
1987 input: Box<HydroNode>,
1988 metadata: HydroIrMetadata,
1989 },
1990 Enumerate {
1991 input: Box<HydroNode>,
1992 metadata: HydroIrMetadata,
1993 },
1994 Inspect {
1995 f: DebugExpr,
1996 input: Box<HydroNode>,
1997 metadata: HydroIrMetadata,
1998 },
1999
2000 Unique {
2001 input: Box<HydroNode>,
2002 metadata: HydroIrMetadata,
2003 },
2004
2005 Sort {
2006 input: Box<HydroNode>,
2007 metadata: HydroIrMetadata,
2008 },
2009 Fold {
2010 init: DebugExpr,
2011 acc: DebugExpr,
2012 input: Box<HydroNode>,
2013 metadata: HydroIrMetadata,
2014 },
2015
2016 Scan {
2017 init: DebugExpr,
2018 acc: DebugExpr,
2019 input: Box<HydroNode>,
2020 metadata: HydroIrMetadata,
2021 },
2022 FoldKeyed {
2023 init: DebugExpr,
2024 acc: DebugExpr,
2025 input: Box<HydroNode>,
2026 metadata: HydroIrMetadata,
2027 },
2028
2029 Reduce {
2030 f: DebugExpr,
2031 input: Box<HydroNode>,
2032 metadata: HydroIrMetadata,
2033 },
2034 ReduceKeyed {
2035 f: DebugExpr,
2036 input: Box<HydroNode>,
2037 metadata: HydroIrMetadata,
2038 },
2039 ReduceKeyedWatermark {
2040 f: DebugExpr,
2041 input: Box<HydroNode>,
2042 watermark: Box<HydroNode>,
2043 metadata: HydroIrMetadata,
2044 },
2045
2046 Network {
2047 name: Option<String>,
2048 networking_info: crate::networking::NetworkingInfo,
2049 serialize_fn: Option<DebugExpr>,
2050 instantiate_fn: DebugInstantiate,
2051 deserialize_fn: Option<DebugExpr>,
2052 input: Box<HydroNode>,
2053 metadata: HydroIrMetadata,
2054 },
2055
2056 ExternalInput {
2057 from_external_key: LocationKey,
2058 from_port_id: ExternalPortId,
2059 from_many: bool,
2060 codec_type: DebugType,
2061 port_hint: NetworkHint,
2062 instantiate_fn: DebugInstantiate,
2063 deserialize_fn: Option<DebugExpr>,
2064 metadata: HydroIrMetadata,
2065 },
2066
2067 Counter {
2068 tag: String,
2069 duration: DebugExpr,
2070 prefix: String,
2071 input: Box<HydroNode>,
2072 metadata: HydroIrMetadata,
2073 },
2074}
2075
2076pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2077pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2078
2079impl HydroNode {
2080 pub fn transform_bottom_up(
2081 &mut self,
2082 transform: &mut impl FnMut(&mut HydroNode),
2083 seen_tees: &mut SeenSharedNodes,
2084 check_well_formed: bool,
2085 ) {
2086 self.transform_children(
2087 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2088 seen_tees,
2089 );
2090
2091 transform(self);
2092
2093 let self_location = self.metadata().location_id.root();
2094
2095 if check_well_formed {
2096 match &*self {
2097 HydroNode::Network { .. } => {}
2098 _ => {
2099 self.input_metadata().iter().for_each(|i| {
2100 if i.location_id.root() != self_location {
2101 panic!(
2102 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2103 i,
2104 i.location_id.root(),
2105 self,
2106 self_location
2107 )
2108 }
2109 });
2110 }
2111 }
2112 }
2113 }
2114
2115 #[inline(always)]
2116 pub fn transform_children(
2117 &mut self,
2118 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2119 seen_tees: &mut SeenSharedNodes,
2120 ) {
2121 match self {
2122 HydroNode::Placeholder => {
2123 panic!();
2124 }
2125
2126 HydroNode::Source { .. }
2127 | HydroNode::SingletonSource { .. }
2128 | HydroNode::CycleSource { .. }
2129 | HydroNode::ExternalInput { .. } => {}
2130
2131 HydroNode::Tee { inner, .. } => {
2132 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2133 *inner = SharedNode(transformed.clone());
2134 } else {
2135 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2136 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2137 let mut orig = inner.0.replace(HydroNode::Placeholder);
2138 transform(&mut orig, seen_tees);
2139 *transformed_cell.borrow_mut() = orig;
2140 *inner = SharedNode(transformed_cell);
2141 }
2142 }
2143
2144 HydroNode::Partition { inner, .. } => {
2145 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2146 *inner = SharedNode(transformed.clone());
2147 } else {
2148 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2149 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2150 let mut orig = inner.0.replace(HydroNode::Placeholder);
2151 transform(&mut orig, seen_tees);
2152 *transformed_cell.borrow_mut() = orig;
2153 *inner = SharedNode(transformed_cell);
2154 }
2155 }
2156
2157 HydroNode::Cast { inner, .. }
2158 | HydroNode::ObserveNonDet { inner, .. }
2159 | HydroNode::BeginAtomic { inner, .. }
2160 | HydroNode::EndAtomic { inner, .. }
2161 | HydroNode::Batch { inner, .. }
2162 | HydroNode::YieldConcat { inner, .. } => {
2163 transform(inner.as_mut(), seen_tees);
2164 }
2165
2166 HydroNode::Chain { first, second, .. } => {
2167 transform(first.as_mut(), seen_tees);
2168 transform(second.as_mut(), seen_tees);
2169 }
2170
2171 HydroNode::ChainFirst { first, second, .. } => {
2172 transform(first.as_mut(), seen_tees);
2173 transform(second.as_mut(), seen_tees);
2174 }
2175
2176 HydroNode::CrossSingleton { left, right, .. }
2177 | HydroNode::CrossProduct { left, right, .. }
2178 | HydroNode::Join { left, right, .. } => {
2179 transform(left.as_mut(), seen_tees);
2180 transform(right.as_mut(), seen_tees);
2181 }
2182
2183 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2184 transform(pos.as_mut(), seen_tees);
2185 transform(neg.as_mut(), seen_tees);
2186 }
2187
2188 HydroNode::ReduceKeyedWatermark {
2189 input, watermark, ..
2190 } => {
2191 transform(input.as_mut(), seen_tees);
2192 transform(watermark.as_mut(), seen_tees);
2193 }
2194
2195 HydroNode::Map { input, .. }
2196 | HydroNode::ResolveFutures { input, .. }
2197 | HydroNode::ResolveFuturesBlocking { input, .. }
2198 | HydroNode::ResolveFuturesOrdered { input, .. }
2199 | HydroNode::FlatMap { input, .. }
2200 | HydroNode::FlatMapStreamBlocking { input, .. }
2201 | HydroNode::Filter { input, .. }
2202 | HydroNode::FilterMap { input, .. }
2203 | HydroNode::Sort { input, .. }
2204 | HydroNode::DeferTick { input, .. }
2205 | HydroNode::Enumerate { input, .. }
2206 | HydroNode::Inspect { input, .. }
2207 | HydroNode::Unique { input, .. }
2208 | HydroNode::Network { input, .. }
2209 | HydroNode::Fold { input, .. }
2210 | HydroNode::Scan { input, .. }
2211 | HydroNode::FoldKeyed { input, .. }
2212 | HydroNode::Reduce { input, .. }
2213 | HydroNode::ReduceKeyed { input, .. }
2214 | HydroNode::Counter { input, .. } => {
2215 transform(input.as_mut(), seen_tees);
2216 }
2217 }
2218 }
2219
2220 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2221 match self {
2222 HydroNode::Placeholder => HydroNode::Placeholder,
2223 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2224 inner: Box::new(inner.deep_clone(seen_tees)),
2225 metadata: metadata.clone(),
2226 },
2227 HydroNode::ObserveNonDet {
2228 inner,
2229 trusted,
2230 metadata,
2231 } => HydroNode::ObserveNonDet {
2232 inner: Box::new(inner.deep_clone(seen_tees)),
2233 trusted: *trusted,
2234 metadata: metadata.clone(),
2235 },
2236 HydroNode::Source { source, metadata } => HydroNode::Source {
2237 source: source.clone(),
2238 metadata: metadata.clone(),
2239 },
2240 HydroNode::SingletonSource {
2241 value,
2242 first_tick_only,
2243 metadata,
2244 } => HydroNode::SingletonSource {
2245 value: value.clone(),
2246 first_tick_only: *first_tick_only,
2247 metadata: metadata.clone(),
2248 },
2249 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2250 cycle_id: *cycle_id,
2251 metadata: metadata.clone(),
2252 },
2253 HydroNode::Tee { inner, metadata } => {
2254 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2255 HydroNode::Tee {
2256 inner: SharedNode(transformed.clone()),
2257 metadata: metadata.clone(),
2258 }
2259 } else {
2260 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2261 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2262 let cloned = inner.0.borrow().deep_clone(seen_tees);
2263 *new_rc.borrow_mut() = cloned;
2264 HydroNode::Tee {
2265 inner: SharedNode(new_rc),
2266 metadata: metadata.clone(),
2267 }
2268 }
2269 }
2270 HydroNode::Partition {
2271 inner,
2272 f,
2273 is_true,
2274 metadata,
2275 } => {
2276 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2277 HydroNode::Partition {
2278 inner: SharedNode(transformed.clone()),
2279 f: f.clone(),
2280 is_true: *is_true,
2281 metadata: metadata.clone(),
2282 }
2283 } else {
2284 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2285 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2286 let cloned = inner.0.borrow().deep_clone(seen_tees);
2287 *new_rc.borrow_mut() = cloned;
2288 HydroNode::Partition {
2289 inner: SharedNode(new_rc),
2290 f: f.clone(),
2291 is_true: *is_true,
2292 metadata: metadata.clone(),
2293 }
2294 }
2295 }
2296 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2297 inner: Box::new(inner.deep_clone(seen_tees)),
2298 metadata: metadata.clone(),
2299 },
2300 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2301 inner: Box::new(inner.deep_clone(seen_tees)),
2302 metadata: metadata.clone(),
2303 },
2304 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2305 inner: Box::new(inner.deep_clone(seen_tees)),
2306 metadata: metadata.clone(),
2307 },
2308 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2309 inner: Box::new(inner.deep_clone(seen_tees)),
2310 metadata: metadata.clone(),
2311 },
2312 HydroNode::Chain {
2313 first,
2314 second,
2315 metadata,
2316 } => HydroNode::Chain {
2317 first: Box::new(first.deep_clone(seen_tees)),
2318 second: Box::new(second.deep_clone(seen_tees)),
2319 metadata: metadata.clone(),
2320 },
2321 HydroNode::ChainFirst {
2322 first,
2323 second,
2324 metadata,
2325 } => HydroNode::ChainFirst {
2326 first: Box::new(first.deep_clone(seen_tees)),
2327 second: Box::new(second.deep_clone(seen_tees)),
2328 metadata: metadata.clone(),
2329 },
2330 HydroNode::CrossProduct {
2331 left,
2332 right,
2333 metadata,
2334 } => HydroNode::CrossProduct {
2335 left: Box::new(left.deep_clone(seen_tees)),
2336 right: Box::new(right.deep_clone(seen_tees)),
2337 metadata: metadata.clone(),
2338 },
2339 HydroNode::CrossSingleton {
2340 left,
2341 right,
2342 metadata,
2343 } => HydroNode::CrossSingleton {
2344 left: Box::new(left.deep_clone(seen_tees)),
2345 right: Box::new(right.deep_clone(seen_tees)),
2346 metadata: metadata.clone(),
2347 },
2348 HydroNode::Join {
2349 left,
2350 right,
2351 metadata,
2352 } => HydroNode::Join {
2353 left: Box::new(left.deep_clone(seen_tees)),
2354 right: Box::new(right.deep_clone(seen_tees)),
2355 metadata: metadata.clone(),
2356 },
2357 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2358 pos: Box::new(pos.deep_clone(seen_tees)),
2359 neg: Box::new(neg.deep_clone(seen_tees)),
2360 metadata: metadata.clone(),
2361 },
2362 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2363 pos: Box::new(pos.deep_clone(seen_tees)),
2364 neg: Box::new(neg.deep_clone(seen_tees)),
2365 metadata: metadata.clone(),
2366 },
2367 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2368 input: Box::new(input.deep_clone(seen_tees)),
2369 metadata: metadata.clone(),
2370 },
2371 HydroNode::ResolveFuturesBlocking { input, metadata } => {
2372 HydroNode::ResolveFuturesBlocking {
2373 input: Box::new(input.deep_clone(seen_tees)),
2374 metadata: metadata.clone(),
2375 }
2376 }
2377 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2378 HydroNode::ResolveFuturesOrdered {
2379 input: Box::new(input.deep_clone(seen_tees)),
2380 metadata: metadata.clone(),
2381 }
2382 }
2383 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2384 f: f.clone(),
2385 input: Box::new(input.deep_clone(seen_tees)),
2386 metadata: metadata.clone(),
2387 },
2388 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2389 f: f.clone(),
2390 input: Box::new(input.deep_clone(seen_tees)),
2391 metadata: metadata.clone(),
2392 },
2393 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
2394 HydroNode::FlatMapStreamBlocking {
2395 f: f.clone(),
2396 input: Box::new(input.deep_clone(seen_tees)),
2397 metadata: metadata.clone(),
2398 }
2399 }
2400 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2401 f: f.clone(),
2402 input: Box::new(input.deep_clone(seen_tees)),
2403 metadata: metadata.clone(),
2404 },
2405 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2406 f: f.clone(),
2407 input: Box::new(input.deep_clone(seen_tees)),
2408 metadata: metadata.clone(),
2409 },
2410 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2411 input: Box::new(input.deep_clone(seen_tees)),
2412 metadata: metadata.clone(),
2413 },
2414 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2415 input: Box::new(input.deep_clone(seen_tees)),
2416 metadata: metadata.clone(),
2417 },
2418 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2419 f: f.clone(),
2420 input: Box::new(input.deep_clone(seen_tees)),
2421 metadata: metadata.clone(),
2422 },
2423 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2424 input: Box::new(input.deep_clone(seen_tees)),
2425 metadata: metadata.clone(),
2426 },
2427 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2428 input: Box::new(input.deep_clone(seen_tees)),
2429 metadata: metadata.clone(),
2430 },
2431 HydroNode::Fold {
2432 init,
2433 acc,
2434 input,
2435 metadata,
2436 } => HydroNode::Fold {
2437 init: init.clone(),
2438 acc: acc.clone(),
2439 input: Box::new(input.deep_clone(seen_tees)),
2440 metadata: metadata.clone(),
2441 },
2442 HydroNode::Scan {
2443 init,
2444 acc,
2445 input,
2446 metadata,
2447 } => HydroNode::Scan {
2448 init: init.clone(),
2449 acc: acc.clone(),
2450 input: Box::new(input.deep_clone(seen_tees)),
2451 metadata: metadata.clone(),
2452 },
2453 HydroNode::FoldKeyed {
2454 init,
2455 acc,
2456 input,
2457 metadata,
2458 } => HydroNode::FoldKeyed {
2459 init: init.clone(),
2460 acc: acc.clone(),
2461 input: Box::new(input.deep_clone(seen_tees)),
2462 metadata: metadata.clone(),
2463 },
2464 HydroNode::ReduceKeyedWatermark {
2465 f,
2466 input,
2467 watermark,
2468 metadata,
2469 } => HydroNode::ReduceKeyedWatermark {
2470 f: f.clone(),
2471 input: Box::new(input.deep_clone(seen_tees)),
2472 watermark: Box::new(watermark.deep_clone(seen_tees)),
2473 metadata: metadata.clone(),
2474 },
2475 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2476 f: f.clone(),
2477 input: Box::new(input.deep_clone(seen_tees)),
2478 metadata: metadata.clone(),
2479 },
2480 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2481 f: f.clone(),
2482 input: Box::new(input.deep_clone(seen_tees)),
2483 metadata: metadata.clone(),
2484 },
2485 HydroNode::Network {
2486 name,
2487 networking_info,
2488 serialize_fn,
2489 instantiate_fn,
2490 deserialize_fn,
2491 input,
2492 metadata,
2493 } => HydroNode::Network {
2494 name: name.clone(),
2495 networking_info: networking_info.clone(),
2496 serialize_fn: serialize_fn.clone(),
2497 instantiate_fn: instantiate_fn.clone(),
2498 deserialize_fn: deserialize_fn.clone(),
2499 input: Box::new(input.deep_clone(seen_tees)),
2500 metadata: metadata.clone(),
2501 },
2502 HydroNode::ExternalInput {
2503 from_external_key,
2504 from_port_id,
2505 from_many,
2506 codec_type,
2507 port_hint,
2508 instantiate_fn,
2509 deserialize_fn,
2510 metadata,
2511 } => HydroNode::ExternalInput {
2512 from_external_key: *from_external_key,
2513 from_port_id: *from_port_id,
2514 from_many: *from_many,
2515 codec_type: codec_type.clone(),
2516 port_hint: *port_hint,
2517 instantiate_fn: instantiate_fn.clone(),
2518 deserialize_fn: deserialize_fn.clone(),
2519 metadata: metadata.clone(),
2520 },
2521 HydroNode::Counter {
2522 tag,
2523 duration,
2524 prefix,
2525 input,
2526 metadata,
2527 } => HydroNode::Counter {
2528 tag: tag.clone(),
2529 duration: duration.clone(),
2530 prefix: prefix.clone(),
2531 input: Box::new(input.deep_clone(seen_tees)),
2532 metadata: metadata.clone(),
2533 },
2534 }
2535 }
2536
2537 #[cfg(feature = "build")]
2538 pub fn emit_core(
2539 &mut self,
2540 builders_or_callback: &mut BuildersOrCallback<
2541 impl FnMut(&mut HydroRoot, &mut usize),
2542 impl FnMut(&mut HydroNode, &mut usize),
2543 >,
2544 seen_tees: &mut SeenSharedNodes,
2545 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
2546 next_stmt_id: &mut usize,
2547 ) -> syn::Ident {
2548 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2549
2550 self.transform_bottom_up(
2551 &mut |node: &mut HydroNode| {
2552 let out_location = node.metadata().location_id.clone();
2553 match node {
2554 HydroNode::Placeholder => {
2555 panic!()
2556 }
2557
2558 HydroNode::Cast { .. } => {
2559 match builders_or_callback {
2562 BuildersOrCallback::Builders(_) => {}
2563 BuildersOrCallback::Callback(_, node_callback) => {
2564 node_callback(node, next_stmt_id);
2565 }
2566 }
2567
2568 *next_stmt_id += 1;
2569 }
2571
2572 HydroNode::ObserveNonDet {
2573 inner,
2574 trusted,
2575 metadata,
2576 ..
2577 } => {
2578 let inner_ident = ident_stack.pop().unwrap();
2579
2580 let observe_ident =
2581 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2582
2583 match builders_or_callback {
2584 BuildersOrCallback::Builders(graph_builders) => {
2585 graph_builders.observe_nondet(
2586 *trusted,
2587 &inner.metadata().location_id,
2588 inner_ident,
2589 &inner.metadata().collection_kind,
2590 &observe_ident,
2591 &metadata.collection_kind,
2592 &metadata.op,
2593 );
2594 }
2595 BuildersOrCallback::Callback(_, node_callback) => {
2596 node_callback(node, next_stmt_id);
2597 }
2598 }
2599
2600 *next_stmt_id += 1;
2601
2602 ident_stack.push(observe_ident);
2603 }
2604
2605 HydroNode::Batch {
2606 inner, metadata, ..
2607 } => {
2608 let inner_ident = ident_stack.pop().unwrap();
2609
2610 let batch_ident =
2611 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2612
2613 match builders_or_callback {
2614 BuildersOrCallback::Builders(graph_builders) => {
2615 graph_builders.batch(
2616 inner_ident,
2617 &inner.metadata().location_id,
2618 &inner.metadata().collection_kind,
2619 &batch_ident,
2620 &out_location,
2621 &metadata.op,
2622 );
2623 }
2624 BuildersOrCallback::Callback(_, node_callback) => {
2625 node_callback(node, next_stmt_id);
2626 }
2627 }
2628
2629 *next_stmt_id += 1;
2630
2631 ident_stack.push(batch_ident);
2632 }
2633
2634 HydroNode::YieldConcat { inner, .. } => {
2635 let inner_ident = ident_stack.pop().unwrap();
2636
2637 let yield_ident =
2638 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2639
2640 match builders_or_callback {
2641 BuildersOrCallback::Builders(graph_builders) => {
2642 graph_builders.yield_from_tick(
2643 inner_ident,
2644 &inner.metadata().location_id,
2645 &inner.metadata().collection_kind,
2646 &yield_ident,
2647 &out_location,
2648 );
2649 }
2650 BuildersOrCallback::Callback(_, node_callback) => {
2651 node_callback(node, next_stmt_id);
2652 }
2653 }
2654
2655 *next_stmt_id += 1;
2656
2657 ident_stack.push(yield_ident);
2658 }
2659
2660 HydroNode::BeginAtomic { inner, metadata } => {
2661 let inner_ident = ident_stack.pop().unwrap();
2662
2663 let begin_ident =
2664 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2665
2666 match builders_or_callback {
2667 BuildersOrCallback::Builders(graph_builders) => {
2668 graph_builders.begin_atomic(
2669 inner_ident,
2670 &inner.metadata().location_id,
2671 &inner.metadata().collection_kind,
2672 &begin_ident,
2673 &out_location,
2674 &metadata.op,
2675 );
2676 }
2677 BuildersOrCallback::Callback(_, node_callback) => {
2678 node_callback(node, next_stmt_id);
2679 }
2680 }
2681
2682 *next_stmt_id += 1;
2683
2684 ident_stack.push(begin_ident);
2685 }
2686
2687 HydroNode::EndAtomic { inner, .. } => {
2688 let inner_ident = ident_stack.pop().unwrap();
2689
2690 let end_ident =
2691 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2692
2693 match builders_or_callback {
2694 BuildersOrCallback::Builders(graph_builders) => {
2695 graph_builders.end_atomic(
2696 inner_ident,
2697 &inner.metadata().location_id,
2698 &inner.metadata().collection_kind,
2699 &end_ident,
2700 );
2701 }
2702 BuildersOrCallback::Callback(_, node_callback) => {
2703 node_callback(node, next_stmt_id);
2704 }
2705 }
2706
2707 *next_stmt_id += 1;
2708
2709 ident_stack.push(end_ident);
2710 }
2711
2712 HydroNode::Source {
2713 source, metadata, ..
2714 } => {
2715 if let HydroSource::ExternalNetwork() = source {
2716 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2717 } else {
2718 let source_ident =
2719 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2720
2721 let source_stmt = match source {
2722 HydroSource::Stream(expr) => {
2723 debug_assert!(metadata.location_id.is_top_level());
2724 parse_quote! {
2725 #source_ident = source_stream(#expr);
2726 }
2727 }
2728
2729 HydroSource::ExternalNetwork() => {
2730 unreachable!()
2731 }
2732
2733 HydroSource::Iter(expr) => {
2734 if metadata.location_id.is_top_level() {
2735 parse_quote! {
2736 #source_ident = source_iter(#expr);
2737 }
2738 } else {
2739 parse_quote! {
2741 #source_ident = source_iter(#expr) -> persist::<'static>();
2742 }
2743 }
2744 }
2745
2746 HydroSource::Spin() => {
2747 debug_assert!(metadata.location_id.is_top_level());
2748 parse_quote! {
2749 #source_ident = spin();
2750 }
2751 }
2752
2753 HydroSource::ClusterMembers(target_loc, state) => {
2754 debug_assert!(metadata.location_id.is_top_level());
2755
2756 let members_tee_ident = syn::Ident::new(
2757 &format!(
2758 "__cluster_members_tee_{}_{}",
2759 metadata.location_id.root().key(),
2760 target_loc.key(),
2761 ),
2762 Span::call_site(),
2763 );
2764
2765 match state {
2766 ClusterMembersState::Stream(d) => {
2767 parse_quote! {
2768 #members_tee_ident = source_stream(#d) -> tee();
2769 #source_ident = #members_tee_ident;
2770 }
2771 },
2772 ClusterMembersState::Uninit => syn::parse_quote! {
2773 #source_ident = source_stream(DUMMY);
2774 },
2775 ClusterMembersState::Tee(..) => parse_quote! {
2776 #source_ident = #members_tee_ident;
2777 },
2778 }
2779 }
2780
2781 HydroSource::Embedded(ident) => {
2782 parse_quote! {
2783 #source_ident = source_stream(#ident);
2784 }
2785 }
2786
2787 HydroSource::EmbeddedSingleton(ident) => {
2788 parse_quote! {
2789 #source_ident = source_iter([#ident]);
2790 }
2791 }
2792 };
2793
2794 match builders_or_callback {
2795 BuildersOrCallback::Builders(graph_builders) => {
2796 let builder = graph_builders.get_dfir_mut(&out_location);
2797 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2798 }
2799 BuildersOrCallback::Callback(_, node_callback) => {
2800 node_callback(node, next_stmt_id);
2801 }
2802 }
2803
2804 *next_stmt_id += 1;
2805
2806 ident_stack.push(source_ident);
2807 }
2808 }
2809
2810 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
2811 let source_ident =
2812 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2813
2814 match builders_or_callback {
2815 BuildersOrCallback::Builders(graph_builders) => {
2816 let builder = graph_builders.get_dfir_mut(&out_location);
2817
2818 if *first_tick_only {
2819 assert!(
2820 !metadata.location_id.is_top_level(),
2821 "first_tick_only SingletonSource must be inside a tick"
2822 );
2823 }
2824
2825 if *first_tick_only
2826 || (metadata.location_id.is_top_level()
2827 && metadata.collection_kind.is_bounded())
2828 {
2829 builder.add_dfir(
2830 parse_quote! {
2831 #source_ident = source_iter([#value]);
2832 },
2833 None,
2834 Some(&next_stmt_id.to_string()),
2835 );
2836 } else {
2837 builder.add_dfir(
2838 parse_quote! {
2839 #source_ident = source_iter([#value]) -> persist::<'static>();
2840 },
2841 None,
2842 Some(&next_stmt_id.to_string()),
2843 );
2844 }
2845 }
2846 BuildersOrCallback::Callback(_, node_callback) => {
2847 node_callback(node, next_stmt_id);
2848 }
2849 }
2850
2851 *next_stmt_id += 1;
2852
2853 ident_stack.push(source_ident);
2854 }
2855
2856 HydroNode::CycleSource { cycle_id, .. } => {
2857 let ident = cycle_id.as_ident();
2858
2859 match builders_or_callback {
2860 BuildersOrCallback::Builders(_) => {}
2861 BuildersOrCallback::Callback(_, node_callback) => {
2862 node_callback(node, next_stmt_id);
2863 }
2864 }
2865
2866 *next_stmt_id += 1;
2868
2869 ident_stack.push(ident);
2870 }
2871
2872 HydroNode::Tee { inner, .. } => {
2873 let ret_ident = if let Some(built_idents) =
2874 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2875 {
2876 match builders_or_callback {
2877 BuildersOrCallback::Builders(_) => {}
2878 BuildersOrCallback::Callback(_, node_callback) => {
2879 node_callback(node, next_stmt_id);
2880 }
2881 }
2882
2883 built_idents[0].clone()
2884 } else {
2885 let inner_ident = ident_stack.pop().unwrap();
2888
2889 let tee_ident =
2890 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2891
2892 built_tees.insert(
2893 inner.0.as_ref() as *const RefCell<HydroNode>,
2894 vec![tee_ident.clone()],
2895 );
2896
2897 match builders_or_callback {
2898 BuildersOrCallback::Builders(graph_builders) => {
2899 let builder = graph_builders.get_dfir_mut(&out_location);
2900 builder.add_dfir(
2901 parse_quote! {
2902 #tee_ident = #inner_ident -> tee();
2903 },
2904 None,
2905 Some(&next_stmt_id.to_string()),
2906 );
2907 }
2908 BuildersOrCallback::Callback(_, node_callback) => {
2909 node_callback(node, next_stmt_id);
2910 }
2911 }
2912
2913 tee_ident
2914 };
2915
2916 *next_stmt_id += 1;
2920 ident_stack.push(ret_ident);
2921 }
2922
2923 HydroNode::Partition {
2924 inner, f, is_true, ..
2925 } => {
2926 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
2928 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
2929 match builders_or_callback {
2930 BuildersOrCallback::Builders(_) => {}
2931 BuildersOrCallback::Callback(_, node_callback) => {
2932 node_callback(node, next_stmt_id);
2933 }
2934 }
2935
2936 let idx = if is_true { 0 } else { 1 };
2937 built_idents[idx].clone()
2938 } else {
2939 let inner_ident = ident_stack.pop().unwrap();
2942
2943 let partition_ident = syn::Ident::new(
2944 &format!("stream_{}_partition", *next_stmt_id),
2945 Span::call_site(),
2946 );
2947 let true_ident = syn::Ident::new(
2948 &format!("stream_{}_true", *next_stmt_id),
2949 Span::call_site(),
2950 );
2951 let false_ident = syn::Ident::new(
2952 &format!("stream_{}_false", *next_stmt_id),
2953 Span::call_site(),
2954 );
2955
2956 built_tees.insert(
2957 ptr,
2958 vec![true_ident.clone(), false_ident.clone()],
2959 );
2960
2961 match builders_or_callback {
2962 BuildersOrCallback::Builders(graph_builders) => {
2963 let builder = graph_builders.get_dfir_mut(&out_location);
2964 builder.add_dfir(
2965 parse_quote! {
2966 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f)(__item) { 0_usize } else { 1_usize });
2967 #true_ident = #partition_ident[0];
2968 #false_ident = #partition_ident[1];
2969 },
2970 None,
2971 Some(&next_stmt_id.to_string()),
2972 );
2973 }
2974 BuildersOrCallback::Callback(_, node_callback) => {
2975 node_callback(node, next_stmt_id);
2976 }
2977 }
2978
2979 if is_true { true_ident } else { false_ident }
2980 };
2981
2982 *next_stmt_id += 1;
2983 ident_stack.push(ret_ident);
2984 }
2985
2986 HydroNode::Chain { .. } => {
2987 let second_ident = ident_stack.pop().unwrap();
2989 let first_ident = ident_stack.pop().unwrap();
2990
2991 let chain_ident =
2992 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2993
2994 match builders_or_callback {
2995 BuildersOrCallback::Builders(graph_builders) => {
2996 let builder = graph_builders.get_dfir_mut(&out_location);
2997 builder.add_dfir(
2998 parse_quote! {
2999 #chain_ident = chain();
3000 #first_ident -> [0]#chain_ident;
3001 #second_ident -> [1]#chain_ident;
3002 },
3003 None,
3004 Some(&next_stmt_id.to_string()),
3005 );
3006 }
3007 BuildersOrCallback::Callback(_, node_callback) => {
3008 node_callback(node, next_stmt_id);
3009 }
3010 }
3011
3012 *next_stmt_id += 1;
3013
3014 ident_stack.push(chain_ident);
3015 }
3016
3017 HydroNode::ChainFirst { .. } => {
3018 let second_ident = ident_stack.pop().unwrap();
3019 let first_ident = ident_stack.pop().unwrap();
3020
3021 let chain_ident =
3022 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3023
3024 match builders_or_callback {
3025 BuildersOrCallback::Builders(graph_builders) => {
3026 let builder = graph_builders.get_dfir_mut(&out_location);
3027 builder.add_dfir(
3028 parse_quote! {
3029 #chain_ident = chain_first_n(1);
3030 #first_ident -> [0]#chain_ident;
3031 #second_ident -> [1]#chain_ident;
3032 },
3033 None,
3034 Some(&next_stmt_id.to_string()),
3035 );
3036 }
3037 BuildersOrCallback::Callback(_, node_callback) => {
3038 node_callback(node, next_stmt_id);
3039 }
3040 }
3041
3042 *next_stmt_id += 1;
3043
3044 ident_stack.push(chain_ident);
3045 }
3046
3047 HydroNode::CrossSingleton { right, .. } => {
3048 let right_ident = ident_stack.pop().unwrap();
3049 let left_ident = ident_stack.pop().unwrap();
3050
3051 let cross_ident =
3052 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3053
3054 match builders_or_callback {
3055 BuildersOrCallback::Builders(graph_builders) => {
3056 let builder = graph_builders.get_dfir_mut(&out_location);
3057
3058 if right.metadata().location_id.is_top_level()
3059 && right.metadata().collection_kind.is_bounded()
3060 {
3061 builder.add_dfir(
3062 parse_quote! {
3063 #cross_ident = cross_singleton();
3064 #left_ident -> [input]#cross_ident;
3065 #right_ident -> persist::<'static>() -> [single]#cross_ident;
3066 },
3067 None,
3068 Some(&next_stmt_id.to_string()),
3069 );
3070 } else {
3071 builder.add_dfir(
3072 parse_quote! {
3073 #cross_ident = cross_singleton();
3074 #left_ident -> [input]#cross_ident;
3075 #right_ident -> [single]#cross_ident;
3076 },
3077 None,
3078 Some(&next_stmt_id.to_string()),
3079 );
3080 }
3081 }
3082 BuildersOrCallback::Callback(_, node_callback) => {
3083 node_callback(node, next_stmt_id);
3084 }
3085 }
3086
3087 *next_stmt_id += 1;
3088
3089 ident_stack.push(cross_ident);
3090 }
3091
3092 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3093 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3094 parse_quote!(cross_join_multiset)
3095 } else {
3096 parse_quote!(join_multiset)
3097 };
3098
3099 let (HydroNode::CrossProduct { left, right, .. }
3100 | HydroNode::Join { left, right, .. }) = node
3101 else {
3102 unreachable!()
3103 };
3104
3105 let is_top_level = left.metadata().location_id.is_top_level()
3106 && right.metadata().location_id.is_top_level();
3107 let left_lifetime = if left.metadata().location_id.is_top_level() {
3108 quote!('static)
3109 } else {
3110 quote!('tick)
3111 };
3112
3113 let right_lifetime = if right.metadata().location_id.is_top_level() {
3114 quote!('static)
3115 } else {
3116 quote!('tick)
3117 };
3118
3119 let right_ident = ident_stack.pop().unwrap();
3120 let left_ident = ident_stack.pop().unwrap();
3121
3122 let stream_ident =
3123 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3124
3125 match builders_or_callback {
3126 BuildersOrCallback::Builders(graph_builders) => {
3127 let builder = graph_builders.get_dfir_mut(&out_location);
3128 builder.add_dfir(
3129 if is_top_level {
3130 parse_quote! {
3133 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
3134 #left_ident -> [0]#stream_ident;
3135 #right_ident -> [1]#stream_ident;
3136 }
3137 } else {
3138 parse_quote! {
3139 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
3140 #left_ident -> [0]#stream_ident;
3141 #right_ident -> [1]#stream_ident;
3142 }
3143 }
3144 ,
3145 None,
3146 Some(&next_stmt_id.to_string()),
3147 );
3148 }
3149 BuildersOrCallback::Callback(_, node_callback) => {
3150 node_callback(node, next_stmt_id);
3151 }
3152 }
3153
3154 *next_stmt_id += 1;
3155
3156 ident_stack.push(stream_ident);
3157 }
3158
3159 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
3160 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
3161 parse_quote!(difference)
3162 } else {
3163 parse_quote!(anti_join)
3164 };
3165
3166 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
3167 node
3168 else {
3169 unreachable!()
3170 };
3171
3172 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
3173 quote!('static)
3174 } else {
3175 quote!('tick)
3176 };
3177
3178 let neg_ident = ident_stack.pop().unwrap();
3179 let pos_ident = ident_stack.pop().unwrap();
3180
3181 let stream_ident =
3182 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3183
3184 match builders_or_callback {
3185 BuildersOrCallback::Builders(graph_builders) => {
3186 let builder = graph_builders.get_dfir_mut(&out_location);
3187 builder.add_dfir(
3188 parse_quote! {
3189 #stream_ident = #operator::<'tick, #neg_lifetime>();
3190 #pos_ident -> [pos]#stream_ident;
3191 #neg_ident -> [neg]#stream_ident;
3192 },
3193 None,
3194 Some(&next_stmt_id.to_string()),
3195 );
3196 }
3197 BuildersOrCallback::Callback(_, node_callback) => {
3198 node_callback(node, next_stmt_id);
3199 }
3200 }
3201
3202 *next_stmt_id += 1;
3203
3204 ident_stack.push(stream_ident);
3205 }
3206
3207 HydroNode::ResolveFutures { .. } => {
3208 let input_ident = ident_stack.pop().unwrap();
3209
3210 let futures_ident =
3211 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3212
3213 match builders_or_callback {
3214 BuildersOrCallback::Builders(graph_builders) => {
3215 let builder = graph_builders.get_dfir_mut(&out_location);
3216 builder.add_dfir(
3217 parse_quote! {
3218 #futures_ident = #input_ident -> resolve_futures();
3219 },
3220 None,
3221 Some(&next_stmt_id.to_string()),
3222 );
3223 }
3224 BuildersOrCallback::Callback(_, node_callback) => {
3225 node_callback(node, next_stmt_id);
3226 }
3227 }
3228
3229 *next_stmt_id += 1;
3230
3231 ident_stack.push(futures_ident);
3232 }
3233
3234 HydroNode::ResolveFuturesBlocking { .. } => {
3235 let input_ident = ident_stack.pop().unwrap();
3236
3237 let futures_ident =
3238 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3239
3240 match builders_or_callback {
3241 BuildersOrCallback::Builders(graph_builders) => {
3242 let builder = graph_builders.get_dfir_mut(&out_location);
3243 builder.add_dfir(
3244 parse_quote! {
3245 #futures_ident = #input_ident -> resolve_futures_blocking();
3246 },
3247 None,
3248 Some(&next_stmt_id.to_string()),
3249 );
3250 }
3251 BuildersOrCallback::Callback(_, node_callback) => {
3252 node_callback(node, next_stmt_id);
3253 }
3254 }
3255
3256 *next_stmt_id += 1;
3257
3258 ident_stack.push(futures_ident);
3259 }
3260
3261 HydroNode::ResolveFuturesOrdered { .. } => {
3262 let input_ident = ident_stack.pop().unwrap();
3263
3264 let futures_ident =
3265 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3266
3267 match builders_or_callback {
3268 BuildersOrCallback::Builders(graph_builders) => {
3269 let builder = graph_builders.get_dfir_mut(&out_location);
3270 builder.add_dfir(
3271 parse_quote! {
3272 #futures_ident = #input_ident -> resolve_futures_ordered();
3273 },
3274 None,
3275 Some(&next_stmt_id.to_string()),
3276 );
3277 }
3278 BuildersOrCallback::Callback(_, node_callback) => {
3279 node_callback(node, next_stmt_id);
3280 }
3281 }
3282
3283 *next_stmt_id += 1;
3284
3285 ident_stack.push(futures_ident);
3286 }
3287
3288 HydroNode::Map { f, .. } => {
3289 let input_ident = ident_stack.pop().unwrap();
3290
3291 let map_ident =
3292 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3293
3294 match builders_or_callback {
3295 BuildersOrCallback::Builders(graph_builders) => {
3296 let builder = graph_builders.get_dfir_mut(&out_location);
3297 builder.add_dfir(
3298 parse_quote! {
3299 #map_ident = #input_ident -> map(#f);
3300 },
3301 None,
3302 Some(&next_stmt_id.to_string()),
3303 );
3304 }
3305 BuildersOrCallback::Callback(_, node_callback) => {
3306 node_callback(node, next_stmt_id);
3307 }
3308 }
3309
3310 *next_stmt_id += 1;
3311
3312 ident_stack.push(map_ident);
3313 }
3314
3315 HydroNode::FlatMap { f, .. } => {
3316 let input_ident = ident_stack.pop().unwrap();
3317
3318 let flat_map_ident =
3319 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3320
3321 match builders_or_callback {
3322 BuildersOrCallback::Builders(graph_builders) => {
3323 let builder = graph_builders.get_dfir_mut(&out_location);
3324 builder.add_dfir(
3325 parse_quote! {
3326 #flat_map_ident = #input_ident -> flat_map(#f);
3327 },
3328 None,
3329 Some(&next_stmt_id.to_string()),
3330 );
3331 }
3332 BuildersOrCallback::Callback(_, node_callback) => {
3333 node_callback(node, next_stmt_id);
3334 }
3335 }
3336
3337 *next_stmt_id += 1;
3338
3339 ident_stack.push(flat_map_ident);
3340 }
3341
3342 HydroNode::FlatMapStreamBlocking { f, .. } => {
3343 let input_ident = ident_stack.pop().unwrap();
3344
3345 let flat_map_stream_blocking_ident =
3346 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3347
3348 match builders_or_callback {
3349 BuildersOrCallback::Builders(graph_builders) => {
3350 let builder = graph_builders.get_dfir_mut(&out_location);
3351 builder.add_dfir(
3352 parse_quote! {
3353 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f);
3354 },
3355 None,
3356 Some(&next_stmt_id.to_string()),
3357 );
3358 }
3359 BuildersOrCallback::Callback(_, node_callback) => {
3360 node_callback(node, next_stmt_id);
3361 }
3362 }
3363
3364 *next_stmt_id += 1;
3365
3366 ident_stack.push(flat_map_stream_blocking_ident);
3367 }
3368
3369 HydroNode::Filter { f, .. } => {
3370 let input_ident = ident_stack.pop().unwrap();
3371
3372 let filter_ident =
3373 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3374
3375 match builders_or_callback {
3376 BuildersOrCallback::Builders(graph_builders) => {
3377 let builder = graph_builders.get_dfir_mut(&out_location);
3378 builder.add_dfir(
3379 parse_quote! {
3380 #filter_ident = #input_ident -> filter(#f);
3381 },
3382 None,
3383 Some(&next_stmt_id.to_string()),
3384 );
3385 }
3386 BuildersOrCallback::Callback(_, node_callback) => {
3387 node_callback(node, next_stmt_id);
3388 }
3389 }
3390
3391 *next_stmt_id += 1;
3392
3393 ident_stack.push(filter_ident);
3394 }
3395
3396 HydroNode::FilterMap { f, .. } => {
3397 let input_ident = ident_stack.pop().unwrap();
3398
3399 let filter_map_ident =
3400 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3401
3402 match builders_or_callback {
3403 BuildersOrCallback::Builders(graph_builders) => {
3404 let builder = graph_builders.get_dfir_mut(&out_location);
3405 builder.add_dfir(
3406 parse_quote! {
3407 #filter_map_ident = #input_ident -> filter_map(#f);
3408 },
3409 None,
3410 Some(&next_stmt_id.to_string()),
3411 );
3412 }
3413 BuildersOrCallback::Callback(_, node_callback) => {
3414 node_callback(node, next_stmt_id);
3415 }
3416 }
3417
3418 *next_stmt_id += 1;
3419
3420 ident_stack.push(filter_map_ident);
3421 }
3422
3423 HydroNode::Sort { .. } => {
3424 let input_ident = ident_stack.pop().unwrap();
3425
3426 let sort_ident =
3427 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3428
3429 match builders_or_callback {
3430 BuildersOrCallback::Builders(graph_builders) => {
3431 let builder = graph_builders.get_dfir_mut(&out_location);
3432 builder.add_dfir(
3433 parse_quote! {
3434 #sort_ident = #input_ident -> sort();
3435 },
3436 None,
3437 Some(&next_stmt_id.to_string()),
3438 );
3439 }
3440 BuildersOrCallback::Callback(_, node_callback) => {
3441 node_callback(node, next_stmt_id);
3442 }
3443 }
3444
3445 *next_stmt_id += 1;
3446
3447 ident_stack.push(sort_ident);
3448 }
3449
3450 HydroNode::DeferTick { .. } => {
3451 let input_ident = ident_stack.pop().unwrap();
3452
3453 let defer_tick_ident =
3454 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3455
3456 match builders_or_callback {
3457 BuildersOrCallback::Builders(graph_builders) => {
3458 let builder = graph_builders.get_dfir_mut(&out_location);
3459 builder.add_dfir(
3460 parse_quote! {
3461 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3462 },
3463 None,
3464 Some(&next_stmt_id.to_string()),
3465 );
3466 }
3467 BuildersOrCallback::Callback(_, node_callback) => {
3468 node_callback(node, next_stmt_id);
3469 }
3470 }
3471
3472 *next_stmt_id += 1;
3473
3474 ident_stack.push(defer_tick_ident);
3475 }
3476
3477 HydroNode::Enumerate { input, .. } => {
3478 let input_ident = ident_stack.pop().unwrap();
3479
3480 let enumerate_ident =
3481 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3482
3483 match builders_or_callback {
3484 BuildersOrCallback::Builders(graph_builders) => {
3485 let builder = graph_builders.get_dfir_mut(&out_location);
3486 let lifetime = if input.metadata().location_id.is_top_level() {
3487 quote!('static)
3488 } else {
3489 quote!('tick)
3490 };
3491 builder.add_dfir(
3492 parse_quote! {
3493 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3494 },
3495 None,
3496 Some(&next_stmt_id.to_string()),
3497 );
3498 }
3499 BuildersOrCallback::Callback(_, node_callback) => {
3500 node_callback(node, next_stmt_id);
3501 }
3502 }
3503
3504 *next_stmt_id += 1;
3505
3506 ident_stack.push(enumerate_ident);
3507 }
3508
3509 HydroNode::Inspect { f, .. } => {
3510 let input_ident = ident_stack.pop().unwrap();
3511
3512 let inspect_ident =
3513 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3514
3515 match builders_or_callback {
3516 BuildersOrCallback::Builders(graph_builders) => {
3517 let builder = graph_builders.get_dfir_mut(&out_location);
3518 builder.add_dfir(
3519 parse_quote! {
3520 #inspect_ident = #input_ident -> inspect(#f);
3521 },
3522 None,
3523 Some(&next_stmt_id.to_string()),
3524 );
3525 }
3526 BuildersOrCallback::Callback(_, node_callback) => {
3527 node_callback(node, next_stmt_id);
3528 }
3529 }
3530
3531 *next_stmt_id += 1;
3532
3533 ident_stack.push(inspect_ident);
3534 }
3535
3536 HydroNode::Unique { input, .. } => {
3537 let input_ident = ident_stack.pop().unwrap();
3538
3539 let unique_ident =
3540 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3541
3542 match builders_or_callback {
3543 BuildersOrCallback::Builders(graph_builders) => {
3544 let builder = graph_builders.get_dfir_mut(&out_location);
3545 let lifetime = if input.metadata().location_id.is_top_level() {
3546 quote!('static)
3547 } else {
3548 quote!('tick)
3549 };
3550
3551 builder.add_dfir(
3552 parse_quote! {
3553 #unique_ident = #input_ident -> unique::<#lifetime>();
3554 },
3555 None,
3556 Some(&next_stmt_id.to_string()),
3557 );
3558 }
3559 BuildersOrCallback::Callback(_, node_callback) => {
3560 node_callback(node, next_stmt_id);
3561 }
3562 }
3563
3564 *next_stmt_id += 1;
3565
3566 ident_stack.push(unique_ident);
3567 }
3568
3569 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3570 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3571 if input.metadata().location_id.is_top_level()
3572 && input.metadata().collection_kind.is_bounded()
3573 {
3574 parse_quote!(fold_no_replay)
3575 } else {
3576 parse_quote!(fold)
3577 }
3578 } else if matches!(node, HydroNode::Scan { .. }) {
3579 parse_quote!(scan)
3580 } else if let HydroNode::FoldKeyed { input, .. } = node {
3581 if input.metadata().location_id.is_top_level()
3582 && input.metadata().collection_kind.is_bounded()
3583 {
3584 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3585 } else {
3586 parse_quote!(fold_keyed)
3587 }
3588 } else {
3589 unreachable!()
3590 };
3591
3592 let (HydroNode::Fold { input, .. }
3593 | HydroNode::FoldKeyed { input, .. }
3594 | HydroNode::Scan { input, .. }) = node
3595 else {
3596 unreachable!()
3597 };
3598
3599 let lifetime = if input.metadata().location_id.is_top_level() {
3600 quote!('static)
3601 } else {
3602 quote!('tick)
3603 };
3604
3605 let input_ident = ident_stack.pop().unwrap();
3606
3607 let (HydroNode::Fold { init, acc, .. }
3608 | HydroNode::FoldKeyed { init, acc, .. }
3609 | HydroNode::Scan { init, acc, .. }) = &*node
3610 else {
3611 unreachable!()
3612 };
3613
3614 let fold_ident =
3615 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3616
3617 match builders_or_callback {
3618 BuildersOrCallback::Builders(graph_builders) => {
3619 if matches!(node, HydroNode::Fold { .. })
3620 && node.metadata().location_id.is_top_level()
3621 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3622 && graph_builders.singleton_intermediates()
3623 && !node.metadata().collection_kind.is_bounded()
3624 {
3625 let builder = graph_builders.get_dfir_mut(&out_location);
3626
3627 let acc: syn::Expr = parse_quote!({
3628 let mut __inner = #acc;
3629 move |__state, __value| {
3630 __inner(__state, __value);
3631 Some(__state.clone())
3632 }
3633 });
3634
3635 builder.add_dfir(
3636 parse_quote! {
3637 source_iter([(#init)()]) -> [0]#fold_ident;
3638 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3639 #fold_ident = chain();
3640 },
3641 None,
3642 Some(&next_stmt_id.to_string()),
3643 );
3644 } else if matches!(node, HydroNode::FoldKeyed { .. })
3645 && node.metadata().location_id.is_top_level()
3646 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3647 && graph_builders.singleton_intermediates()
3648 && !node.metadata().collection_kind.is_bounded()
3649 {
3650 let builder = graph_builders.get_dfir_mut(&out_location);
3651
3652 let acc: syn::Expr = parse_quote!({
3653 let mut __init = #init;
3654 let mut __inner = #acc;
3655 move |__state, __kv: (_, _)| {
3656 let __state = __state
3658 .entry(::std::clone::Clone::clone(&__kv.0))
3659 .or_insert_with(|| (__init)());
3660 __inner(__state, __kv.1);
3661 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3662 }
3663 });
3664
3665 builder.add_dfir(
3666 parse_quote! {
3667 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3668 },
3669 None,
3670 Some(&next_stmt_id.to_string()),
3671 );
3672 } else {
3673 let builder = graph_builders.get_dfir_mut(&out_location);
3674 builder.add_dfir(
3675 parse_quote! {
3676 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3677 },
3678 None,
3679 Some(&next_stmt_id.to_string()),
3680 );
3681 }
3682 }
3683 BuildersOrCallback::Callback(_, node_callback) => {
3684 node_callback(node, next_stmt_id);
3685 }
3686 }
3687
3688 *next_stmt_id += 1;
3689
3690 ident_stack.push(fold_ident);
3691 }
3692
3693 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3694 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3695 if input.metadata().location_id.is_top_level()
3696 && input.metadata().collection_kind.is_bounded()
3697 {
3698 parse_quote!(reduce_no_replay)
3699 } else {
3700 parse_quote!(reduce)
3701 }
3702 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3703 if input.metadata().location_id.is_top_level()
3704 && input.metadata().collection_kind.is_bounded()
3705 {
3706 todo!(
3707 "Calling keyed reduce on a top-level bounded collection is not supported"
3708 )
3709 } else {
3710 parse_quote!(reduce_keyed)
3711 }
3712 } else {
3713 unreachable!()
3714 };
3715
3716 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3717 else {
3718 unreachable!()
3719 };
3720
3721 let lifetime = if input.metadata().location_id.is_top_level() {
3722 quote!('static)
3723 } else {
3724 quote!('tick)
3725 };
3726
3727 let input_ident = ident_stack.pop().unwrap();
3728
3729 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3730 else {
3731 unreachable!()
3732 };
3733
3734 let reduce_ident =
3735 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3736
3737 match builders_or_callback {
3738 BuildersOrCallback::Builders(graph_builders) => {
3739 if matches!(node, HydroNode::Reduce { .. })
3740 && node.metadata().location_id.is_top_level()
3741 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3742 && graph_builders.singleton_intermediates()
3743 && !node.metadata().collection_kind.is_bounded()
3744 {
3745 todo!(
3746 "Reduce with optional intermediates is not yet supported in simulator"
3747 );
3748 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3749 && node.metadata().location_id.is_top_level()
3750 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3751 && graph_builders.singleton_intermediates()
3752 && !node.metadata().collection_kind.is_bounded()
3753 {
3754 todo!(
3755 "Reduce keyed with optional intermediates is not yet supported in simulator"
3756 );
3757 } else {
3758 let builder = graph_builders.get_dfir_mut(&out_location);
3759 builder.add_dfir(
3760 parse_quote! {
3761 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3762 },
3763 None,
3764 Some(&next_stmt_id.to_string()),
3765 );
3766 }
3767 }
3768 BuildersOrCallback::Callback(_, node_callback) => {
3769 node_callback(node, next_stmt_id);
3770 }
3771 }
3772
3773 *next_stmt_id += 1;
3774
3775 ident_stack.push(reduce_ident);
3776 }
3777
3778 HydroNode::ReduceKeyedWatermark {
3779 f,
3780 input,
3781 metadata,
3782 ..
3783 } => {
3784 let lifetime = if input.metadata().location_id.is_top_level() {
3785 quote!('static)
3786 } else {
3787 quote!('tick)
3788 };
3789
3790 let watermark_ident = ident_stack.pop().unwrap();
3792 let input_ident = ident_stack.pop().unwrap();
3793
3794 let chain_ident = syn::Ident::new(
3795 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3796 Span::call_site(),
3797 );
3798
3799 let fold_ident =
3800 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3801
3802 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3803 && input.metadata().collection_kind.is_bounded()
3804 {
3805 parse_quote!(fold_no_replay)
3806 } else {
3807 parse_quote!(fold)
3808 };
3809
3810 match builders_or_callback {
3811 BuildersOrCallback::Builders(graph_builders) => {
3812 if metadata.location_id.is_top_level()
3813 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3814 && graph_builders.singleton_intermediates()
3815 && !metadata.collection_kind.is_bounded()
3816 {
3817 todo!(
3818 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3819 )
3820 } else {
3821 let builder = graph_builders.get_dfir_mut(&out_location);
3822 builder.add_dfir(
3823 parse_quote! {
3824 #chain_ident = chain();
3825 #input_ident
3826 -> map(|x| (Some(x), None))
3827 -> [0]#chain_ident;
3828 #watermark_ident
3829 -> map(|watermark| (None, Some(watermark)))
3830 -> [1]#chain_ident;
3831
3832 #fold_ident = #chain_ident
3833 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3834 let __reduce_keyed_fn = #f;
3835 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3836 if let Some((k, v)) = opt_payload {
3837 if let Some(curr_watermark) = *opt_curr_watermark {
3838 if k < curr_watermark {
3839 return;
3840 }
3841 }
3842 match map.entry(k) {
3843 ::std::collections::hash_map::Entry::Vacant(e) => {
3844 e.insert(v);
3845 }
3846 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3847 __reduce_keyed_fn(e.get_mut(), v);
3848 }
3849 }
3850 } else {
3851 let watermark = opt_watermark.unwrap();
3852 if let Some(curr_watermark) = *opt_curr_watermark {
3853 if watermark <= curr_watermark {
3854 return;
3855 }
3856 }
3857 *opt_curr_watermark = opt_watermark;
3858 map.retain(|k, _| *k >= watermark);
3859 }
3860 }
3861 })
3862 -> flat_map(|(map, _curr_watermark)| map);
3863 },
3864 None,
3865 Some(&next_stmt_id.to_string()),
3866 );
3867 }
3868 }
3869 BuildersOrCallback::Callback(_, node_callback) => {
3870 node_callback(node, next_stmt_id);
3871 }
3872 }
3873
3874 *next_stmt_id += 1;
3875
3876 ident_stack.push(fold_ident);
3877 }
3878
3879 HydroNode::Network {
3880 networking_info,
3881 serialize_fn: serialize_pipeline,
3882 instantiate_fn,
3883 deserialize_fn: deserialize_pipeline,
3884 input,
3885 ..
3886 } => {
3887 let input_ident = ident_stack.pop().unwrap();
3888
3889 let receiver_stream_ident =
3890 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3891
3892 match builders_or_callback {
3893 BuildersOrCallback::Builders(graph_builders) => {
3894 let (sink_expr, source_expr) = match instantiate_fn {
3895 DebugInstantiate::Building => (
3896 syn::parse_quote!(DUMMY_SINK),
3897 syn::parse_quote!(DUMMY_SOURCE),
3898 ),
3899
3900 DebugInstantiate::Finalized(finalized) => {
3901 (finalized.sink.clone(), finalized.source.clone())
3902 }
3903 };
3904
3905 graph_builders.create_network(
3906 &input.metadata().location_id,
3907 &out_location,
3908 input_ident,
3909 &receiver_stream_ident,
3910 serialize_pipeline.as_ref(),
3911 sink_expr,
3912 source_expr,
3913 deserialize_pipeline.as_ref(),
3914 *next_stmt_id,
3915 networking_info,
3916 );
3917 }
3918 BuildersOrCallback::Callback(_, node_callback) => {
3919 node_callback(node, next_stmt_id);
3920 }
3921 }
3922
3923 *next_stmt_id += 1;
3924
3925 ident_stack.push(receiver_stream_ident);
3926 }
3927
3928 HydroNode::ExternalInput {
3929 instantiate_fn,
3930 deserialize_fn: deserialize_pipeline,
3931 ..
3932 } => {
3933 let receiver_stream_ident =
3934 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3935
3936 match builders_or_callback {
3937 BuildersOrCallback::Builders(graph_builders) => {
3938 let (_, source_expr) = match instantiate_fn {
3939 DebugInstantiate::Building => (
3940 syn::parse_quote!(DUMMY_SINK),
3941 syn::parse_quote!(DUMMY_SOURCE),
3942 ),
3943
3944 DebugInstantiate::Finalized(finalized) => {
3945 (finalized.sink.clone(), finalized.source.clone())
3946 }
3947 };
3948
3949 graph_builders.create_external_source(
3950 &out_location,
3951 source_expr,
3952 &receiver_stream_ident,
3953 deserialize_pipeline.as_ref(),
3954 *next_stmt_id,
3955 );
3956 }
3957 BuildersOrCallback::Callback(_, node_callback) => {
3958 node_callback(node, next_stmt_id);
3959 }
3960 }
3961
3962 *next_stmt_id += 1;
3963
3964 ident_stack.push(receiver_stream_ident);
3965 }
3966
3967 HydroNode::Counter {
3968 tag,
3969 duration,
3970 prefix,
3971 ..
3972 } => {
3973 let input_ident = ident_stack.pop().unwrap();
3974
3975 let counter_ident =
3976 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3977
3978 match builders_or_callback {
3979 BuildersOrCallback::Builders(graph_builders) => {
3980 let arg = format!("{}({})", prefix, tag);
3981 let builder = graph_builders.get_dfir_mut(&out_location);
3982 builder.add_dfir(
3983 parse_quote! {
3984 #counter_ident = #input_ident -> _counter(#arg, #duration);
3985 },
3986 None,
3987 Some(&next_stmt_id.to_string()),
3988 );
3989 }
3990 BuildersOrCallback::Callback(_, node_callback) => {
3991 node_callback(node, next_stmt_id);
3992 }
3993 }
3994
3995 *next_stmt_id += 1;
3996
3997 ident_stack.push(counter_ident);
3998 }
3999 }
4000 },
4001 seen_tees,
4002 false,
4003 );
4004
4005 ident_stack
4006 .pop()
4007 .expect("ident_stack should have exactly one element after traversal")
4008 }
4009
4010 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
4011 match self {
4012 HydroNode::Placeholder => {
4013 panic!()
4014 }
4015 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
4016 HydroNode::Source { source, .. } => match source {
4017 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
4018 HydroSource::ExternalNetwork()
4019 | HydroSource::Spin()
4020 | HydroSource::ClusterMembers(_, _)
4021 | HydroSource::Embedded(_)
4022 | HydroSource::EmbeddedSingleton(_) => {} },
4024 HydroNode::SingletonSource { value, .. } => {
4025 transform(value);
4026 }
4027 HydroNode::CycleSource { .. }
4028 | HydroNode::Tee { .. }
4029 | HydroNode::YieldConcat { .. }
4030 | HydroNode::BeginAtomic { .. }
4031 | HydroNode::EndAtomic { .. }
4032 | HydroNode::Batch { .. }
4033 | HydroNode::Chain { .. }
4034 | HydroNode::ChainFirst { .. }
4035 | HydroNode::CrossProduct { .. }
4036 | HydroNode::CrossSingleton { .. }
4037 | HydroNode::ResolveFutures { .. }
4038 | HydroNode::ResolveFuturesBlocking { .. }
4039 | HydroNode::ResolveFuturesOrdered { .. }
4040 | HydroNode::Join { .. }
4041 | HydroNode::Difference { .. }
4042 | HydroNode::AntiJoin { .. }
4043 | HydroNode::DeferTick { .. }
4044 | HydroNode::Enumerate { .. }
4045 | HydroNode::Unique { .. }
4046 | HydroNode::Sort { .. } => {}
4047 HydroNode::Map { f, .. }
4048 | HydroNode::FlatMap { f, .. }
4049 | HydroNode::FlatMapStreamBlocking { f, .. }
4050 | HydroNode::Filter { f, .. }
4051 | HydroNode::FilterMap { f, .. }
4052 | HydroNode::Inspect { f, .. }
4053 | HydroNode::Partition { f, .. }
4054 | HydroNode::Reduce { f, .. }
4055 | HydroNode::ReduceKeyed { f, .. }
4056 | HydroNode::ReduceKeyedWatermark { f, .. } => {
4057 transform(f);
4058 }
4059 HydroNode::Fold { init, acc, .. }
4060 | HydroNode::Scan { init, acc, .. }
4061 | HydroNode::FoldKeyed { init, acc, .. } => {
4062 transform(init);
4063 transform(acc);
4064 }
4065 HydroNode::Network {
4066 serialize_fn,
4067 deserialize_fn,
4068 ..
4069 } => {
4070 if let Some(serialize_fn) = serialize_fn {
4071 transform(serialize_fn);
4072 }
4073 if let Some(deserialize_fn) = deserialize_fn {
4074 transform(deserialize_fn);
4075 }
4076 }
4077 HydroNode::ExternalInput { deserialize_fn, .. } => {
4078 if let Some(deserialize_fn) = deserialize_fn {
4079 transform(deserialize_fn);
4080 }
4081 }
4082 HydroNode::Counter { duration, .. } => {
4083 transform(duration);
4084 }
4085 }
4086 }
4087
4088 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
4089 &self.metadata().op
4090 }
4091
4092 pub fn metadata(&self) -> &HydroIrMetadata {
4093 match self {
4094 HydroNode::Placeholder => {
4095 panic!()
4096 }
4097 HydroNode::Cast { metadata, .. } => metadata,
4098 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4099 HydroNode::Source { metadata, .. } => metadata,
4100 HydroNode::SingletonSource { metadata, .. } => metadata,
4101 HydroNode::CycleSource { metadata, .. } => metadata,
4102 HydroNode::Tee { metadata, .. } => metadata,
4103 HydroNode::Partition { metadata, .. } => metadata,
4104 HydroNode::YieldConcat { metadata, .. } => metadata,
4105 HydroNode::BeginAtomic { metadata, .. } => metadata,
4106 HydroNode::EndAtomic { metadata, .. } => metadata,
4107 HydroNode::Batch { metadata, .. } => metadata,
4108 HydroNode::Chain { metadata, .. } => metadata,
4109 HydroNode::ChainFirst { metadata, .. } => metadata,
4110 HydroNode::CrossProduct { metadata, .. } => metadata,
4111 HydroNode::CrossSingleton { metadata, .. } => metadata,
4112 HydroNode::Join { metadata, .. } => metadata,
4113 HydroNode::Difference { metadata, .. } => metadata,
4114 HydroNode::AntiJoin { metadata, .. } => metadata,
4115 HydroNode::ResolveFutures { metadata, .. } => metadata,
4116 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4117 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4118 HydroNode::Map { metadata, .. } => metadata,
4119 HydroNode::FlatMap { metadata, .. } => metadata,
4120 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4121 HydroNode::Filter { metadata, .. } => metadata,
4122 HydroNode::FilterMap { metadata, .. } => metadata,
4123 HydroNode::DeferTick { metadata, .. } => metadata,
4124 HydroNode::Enumerate { metadata, .. } => metadata,
4125 HydroNode::Inspect { metadata, .. } => metadata,
4126 HydroNode::Unique { metadata, .. } => metadata,
4127 HydroNode::Sort { metadata, .. } => metadata,
4128 HydroNode::Scan { metadata, .. } => metadata,
4129 HydroNode::Fold { metadata, .. } => metadata,
4130 HydroNode::FoldKeyed { metadata, .. } => metadata,
4131 HydroNode::Reduce { metadata, .. } => metadata,
4132 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4133 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4134 HydroNode::ExternalInput { metadata, .. } => metadata,
4135 HydroNode::Network { metadata, .. } => metadata,
4136 HydroNode::Counter { metadata, .. } => metadata,
4137 }
4138 }
4139
4140 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
4141 &mut self.metadata_mut().op
4142 }
4143
4144 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
4145 match self {
4146 HydroNode::Placeholder => {
4147 panic!()
4148 }
4149 HydroNode::Cast { metadata, .. } => metadata,
4150 HydroNode::ObserveNonDet { metadata, .. } => metadata,
4151 HydroNode::Source { metadata, .. } => metadata,
4152 HydroNode::SingletonSource { metadata, .. } => metadata,
4153 HydroNode::CycleSource { metadata, .. } => metadata,
4154 HydroNode::Tee { metadata, .. } => metadata,
4155 HydroNode::Partition { metadata, .. } => metadata,
4156 HydroNode::YieldConcat { metadata, .. } => metadata,
4157 HydroNode::BeginAtomic { metadata, .. } => metadata,
4158 HydroNode::EndAtomic { metadata, .. } => metadata,
4159 HydroNode::Batch { metadata, .. } => metadata,
4160 HydroNode::Chain { metadata, .. } => metadata,
4161 HydroNode::ChainFirst { metadata, .. } => metadata,
4162 HydroNode::CrossProduct { metadata, .. } => metadata,
4163 HydroNode::CrossSingleton { metadata, .. } => metadata,
4164 HydroNode::Join { metadata, .. } => metadata,
4165 HydroNode::Difference { metadata, .. } => metadata,
4166 HydroNode::AntiJoin { metadata, .. } => metadata,
4167 HydroNode::ResolveFutures { metadata, .. } => metadata,
4168 HydroNode::ResolveFuturesBlocking { metadata, .. } => metadata,
4169 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
4170 HydroNode::Map { metadata, .. } => metadata,
4171 HydroNode::FlatMap { metadata, .. } => metadata,
4172 HydroNode::FlatMapStreamBlocking { metadata, .. } => metadata,
4173 HydroNode::Filter { metadata, .. } => metadata,
4174 HydroNode::FilterMap { metadata, .. } => metadata,
4175 HydroNode::DeferTick { metadata, .. } => metadata,
4176 HydroNode::Enumerate { metadata, .. } => metadata,
4177 HydroNode::Inspect { metadata, .. } => metadata,
4178 HydroNode::Unique { metadata, .. } => metadata,
4179 HydroNode::Sort { metadata, .. } => metadata,
4180 HydroNode::Scan { metadata, .. } => metadata,
4181 HydroNode::Fold { metadata, .. } => metadata,
4182 HydroNode::FoldKeyed { metadata, .. } => metadata,
4183 HydroNode::Reduce { metadata, .. } => metadata,
4184 HydroNode::ReduceKeyed { metadata, .. } => metadata,
4185 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
4186 HydroNode::ExternalInput { metadata, .. } => metadata,
4187 HydroNode::Network { metadata, .. } => metadata,
4188 HydroNode::Counter { metadata, .. } => metadata,
4189 }
4190 }
4191
4192 pub fn input(&self) -> Vec<&HydroNode> {
4193 match self {
4194 HydroNode::Placeholder => {
4195 panic!()
4196 }
4197 HydroNode::Source { .. }
4198 | HydroNode::SingletonSource { .. }
4199 | HydroNode::ExternalInput { .. }
4200 | HydroNode::CycleSource { .. }
4201 | HydroNode::Tee { .. }
4202 | HydroNode::Partition { .. } => {
4203 vec![]
4205 }
4206 HydroNode::Cast { inner, .. }
4207 | HydroNode::ObserveNonDet { inner, .. }
4208 | HydroNode::YieldConcat { inner, .. }
4209 | HydroNode::BeginAtomic { inner, .. }
4210 | HydroNode::EndAtomic { inner, .. }
4211 | HydroNode::Batch { inner, .. } => {
4212 vec![inner]
4213 }
4214 HydroNode::Chain { first, second, .. } => {
4215 vec![first, second]
4216 }
4217 HydroNode::ChainFirst { first, second, .. } => {
4218 vec![first, second]
4219 }
4220 HydroNode::CrossProduct { left, right, .. }
4221 | HydroNode::CrossSingleton { left, right, .. }
4222 | HydroNode::Join { left, right, .. } => {
4223 vec![left, right]
4224 }
4225 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
4226 vec![pos, neg]
4227 }
4228 HydroNode::Map { input, .. }
4229 | HydroNode::FlatMap { input, .. }
4230 | HydroNode::FlatMapStreamBlocking { input, .. }
4231 | HydroNode::Filter { input, .. }
4232 | HydroNode::FilterMap { input, .. }
4233 | HydroNode::Sort { input, .. }
4234 | HydroNode::DeferTick { input, .. }
4235 | HydroNode::Enumerate { input, .. }
4236 | HydroNode::Inspect { input, .. }
4237 | HydroNode::Unique { input, .. }
4238 | HydroNode::Network { input, .. }
4239 | HydroNode::Counter { input, .. }
4240 | HydroNode::ResolveFutures { input, .. }
4241 | HydroNode::ResolveFuturesBlocking { input, .. }
4242 | HydroNode::ResolveFuturesOrdered { input, .. }
4243 | HydroNode::Fold { input, .. }
4244 | HydroNode::FoldKeyed { input, .. }
4245 | HydroNode::Reduce { input, .. }
4246 | HydroNode::ReduceKeyed { input, .. }
4247 | HydroNode::Scan { input, .. } => {
4248 vec![input]
4249 }
4250 HydroNode::ReduceKeyedWatermark {
4251 input, watermark, ..
4252 } => {
4253 vec![input, watermark]
4254 }
4255 }
4256 }
4257
4258 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
4259 self.input()
4260 .iter()
4261 .map(|input_node| input_node.metadata())
4262 .collect()
4263 }
4264
4265 pub fn is_shared_with_others(&self) -> bool {
4269 match self {
4270 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
4271 Rc::strong_count(&inner.0) > 1
4272 }
4273 _ => false,
4274 }
4275 }
4276
4277 pub fn print_root(&self) -> String {
4278 match self {
4279 HydroNode::Placeholder => {
4280 panic!()
4281 }
4282 HydroNode::Cast { .. } => "Cast()".to_owned(),
4283 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
4284 HydroNode::Source { source, .. } => format!("Source({:?})", source),
4285 HydroNode::SingletonSource {
4286 value,
4287 first_tick_only,
4288 ..
4289 } => format!(
4290 "SingletonSource({:?}, first_tick_only={})",
4291 value, first_tick_only
4292 ),
4293 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
4294 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
4295 HydroNode::Partition { f, is_true, .. } => {
4296 format!("Partition({:?}, is_true={})", f, is_true)
4297 }
4298 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
4299 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
4300 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
4301 HydroNode::Batch { .. } => "Batch()".to_owned(),
4302 HydroNode::Chain { first, second, .. } => {
4303 format!("Chain({}, {})", first.print_root(), second.print_root())
4304 }
4305 HydroNode::ChainFirst { first, second, .. } => {
4306 format!(
4307 "ChainFirst({}, {})",
4308 first.print_root(),
4309 second.print_root()
4310 )
4311 }
4312 HydroNode::CrossProduct { left, right, .. } => {
4313 format!(
4314 "CrossProduct({}, {})",
4315 left.print_root(),
4316 right.print_root()
4317 )
4318 }
4319 HydroNode::CrossSingleton { left, right, .. } => {
4320 format!(
4321 "CrossSingleton({}, {})",
4322 left.print_root(),
4323 right.print_root()
4324 )
4325 }
4326 HydroNode::Join { left, right, .. } => {
4327 format!("Join({}, {})", left.print_root(), right.print_root())
4328 }
4329 HydroNode::Difference { pos, neg, .. } => {
4330 format!("Difference({}, {})", pos.print_root(), neg.print_root())
4331 }
4332 HydroNode::AntiJoin { pos, neg, .. } => {
4333 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
4334 }
4335 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
4336 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
4337 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
4338 HydroNode::Map { f, .. } => format!("Map({:?})", f),
4339 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
4340 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
4341 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
4342 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
4343 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
4344 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
4345 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
4346 HydroNode::Unique { .. } => "Unique()".to_owned(),
4347 HydroNode::Sort { .. } => "Sort()".to_owned(),
4348 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
4349 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
4350 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
4351 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
4352 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
4353 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
4354 HydroNode::Network { .. } => "Network()".to_owned(),
4355 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
4356 HydroNode::Counter { tag, duration, .. } => {
4357 format!("Counter({:?}, {:?})", tag, duration)
4358 }
4359 }
4360 }
4361}
4362
4363#[cfg(feature = "build")]
4364fn instantiate_network<'a, D>(
4365 env: &mut D::InstantiateEnv,
4366 from_location: &LocationId,
4367 to_location: &LocationId,
4368 processes: &SparseSecondaryMap<LocationKey, D::Process>,
4369 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
4370 name: Option<&str>,
4371 networking_info: &crate::networking::NetworkingInfo,
4372) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
4373where
4374 D: Deploy<'a>,
4375{
4376 let ((sink, source), connect_fn) = match (from_location, to_location) {
4377 (&LocationId::Process(from), &LocationId::Process(to)) => {
4378 let from_node = processes
4379 .get(from)
4380 .unwrap_or_else(|| {
4381 panic!("A process used in the graph was not instantiated: {}", from)
4382 })
4383 .clone();
4384 let to_node = processes
4385 .get(to)
4386 .unwrap_or_else(|| {
4387 panic!("A process used in the graph was not instantiated: {}", to)
4388 })
4389 .clone();
4390
4391 let sink_port = from_node.next_port();
4392 let source_port = to_node.next_port();
4393
4394 (
4395 D::o2o_sink_source(
4396 env,
4397 &from_node,
4398 &sink_port,
4399 &to_node,
4400 &source_port,
4401 name,
4402 networking_info,
4403 ),
4404 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
4405 )
4406 }
4407 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
4408 let from_node = processes
4409 .get(from)
4410 .unwrap_or_else(|| {
4411 panic!("A process used in the graph was not instantiated: {}", from)
4412 })
4413 .clone();
4414 let to_node = clusters
4415 .get(to)
4416 .unwrap_or_else(|| {
4417 panic!("A cluster used in the graph was not instantiated: {}", to)
4418 })
4419 .clone();
4420
4421 let sink_port = from_node.next_port();
4422 let source_port = to_node.next_port();
4423
4424 (
4425 D::o2m_sink_source(
4426 env,
4427 &from_node,
4428 &sink_port,
4429 &to_node,
4430 &source_port,
4431 name,
4432 networking_info,
4433 ),
4434 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
4435 )
4436 }
4437 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
4438 let from_node = clusters
4439 .get(from)
4440 .unwrap_or_else(|| {
4441 panic!("A cluster used in the graph was not instantiated: {}", from)
4442 })
4443 .clone();
4444 let to_node = processes
4445 .get(to)
4446 .unwrap_or_else(|| {
4447 panic!("A process used in the graph was not instantiated: {}", to)
4448 })
4449 .clone();
4450
4451 let sink_port = from_node.next_port();
4452 let source_port = to_node.next_port();
4453
4454 (
4455 D::m2o_sink_source(
4456 env,
4457 &from_node,
4458 &sink_port,
4459 &to_node,
4460 &source_port,
4461 name,
4462 networking_info,
4463 ),
4464 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4465 )
4466 }
4467 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4468 let from_node = clusters
4469 .get(from)
4470 .unwrap_or_else(|| {
4471 panic!("A cluster used in the graph was not instantiated: {}", from)
4472 })
4473 .clone();
4474 let to_node = clusters
4475 .get(to)
4476 .unwrap_or_else(|| {
4477 panic!("A cluster used in the graph was not instantiated: {}", to)
4478 })
4479 .clone();
4480
4481 let sink_port = from_node.next_port();
4482 let source_port = to_node.next_port();
4483
4484 (
4485 D::m2m_sink_source(
4486 env,
4487 &from_node,
4488 &sink_port,
4489 &to_node,
4490 &source_port,
4491 name,
4492 networking_info,
4493 ),
4494 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4495 )
4496 }
4497 (LocationId::Tick(_, _), _) => panic!(),
4498 (_, LocationId::Tick(_, _)) => panic!(),
4499 (LocationId::Atomic(_), _) => panic!(),
4500 (_, LocationId::Atomic(_)) => panic!(),
4501 };
4502 (sink, source, connect_fn)
4503}
4504
4505#[cfg(test)]
4506mod test {
4507 use std::mem::size_of;
4508
4509 use stageleft::{QuotedWithContext, q};
4510
4511 use super::*;
4512
4513 #[test]
4514 #[cfg_attr(
4515 not(feature = "build"),
4516 ignore = "expects inclusion of feature-gated fields"
4517 )]
4518 fn hydro_node_size() {
4519 assert_eq!(size_of::<HydroNode>(), 248);
4520 }
4521
4522 #[test]
4523 #[cfg_attr(
4524 not(feature = "build"),
4525 ignore = "expects inclusion of feature-gated fields"
4526 )]
4527 fn hydro_root_size() {
4528 assert_eq!(size_of::<HydroRoot>(), 136);
4529 }
4530
4531 #[test]
4532 fn test_simplify_q_macro_basic() {
4533 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4535 let result = simplify_q_macro(simple_expr.clone());
4536 assert_eq!(result, simple_expr);
4537 }
4538
4539 #[test]
4540 fn test_simplify_q_macro_actual_stageleft_call() {
4541 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4543 let result = simplify_q_macro(stageleft_call);
4544 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4547 }
4548
4549 #[test]
4550 fn test_closure_no_pipe_at_start() {
4551 let stageleft_call = q!({
4553 let foo = 123;
4554 move |b: usize| b + foo
4555 })
4556 .splice_fn1_ctx(&());
4557 let result = simplify_q_macro(stageleft_call);
4558 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4559 }
4560}