1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39 fn from(expr: syn::Expr) -> Self {
40 Self(Box::new(expr))
41 }
42}
43
44impl Deref for DebugExpr {
45 type Target = syn::Expr;
46
47 fn deref(&self) -> &Self::Target {
48 &self.0
49 }
50}
51
52impl ToTokens for DebugExpr {
53 fn to_tokens(&self, tokens: &mut TokenStream) {
54 self.0.to_tokens(tokens);
55 }
56}
57
58impl Debug for DebugExpr {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", self.0.to_token_stream())
61 }
62}
63
64impl Display for DebugExpr {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 let original = self.0.as_ref().clone();
67 let simplified = simplify_q_macro(original);
68
69 write!(f, "q!({})", quote::quote!(#simplified))
72 }
73}
74
75fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77 let mut simplifier = QMacroSimplifier::new();
80 simplifier.visit_expr_mut(&mut expr);
81
82 if let Some(simplified) = simplifier.simplified_result {
84 simplified
85 } else {
86 expr
87 }
88}
89
90#[derive(Default)]
92pub struct QMacroSimplifier {
93 pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97 pub fn new() -> Self {
98 Self::default()
99 }
100}
101
102impl VisitMut for QMacroSimplifier {
103 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104 if self.simplified_result.is_some() {
106 return;
107 }
108
109 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110 && self.is_stageleft_runtime_support_call(&path_expr.path)
112 && let Some(closure) = self.extract_closure_from_args(&call.args)
114 {
115 self.simplified_result = Some(closure);
116 return;
117 }
118
119 syn::visit_mut::visit_expr_mut(self, expr);
122 }
123}
124
125impl QMacroSimplifier {
126 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127 if let Some(last_segment) = path.segments.last() {
129 let fn_name = last_segment.ident.to_string();
130 fn_name.contains("_type_hint")
132 && path.segments.len() > 2
133 && path.segments[0].ident == "stageleft"
134 && path.segments[1].ident == "runtime_support"
135 } else {
136 false
137 }
138 }
139
140 fn extract_closure_from_args(
141 &self,
142 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143 ) -> Option<syn::Expr> {
144 for arg in args {
146 if let syn::Expr::Closure(_) = arg {
147 return Some(arg.clone());
148 }
149 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151 return Some(closure_expr);
152 }
153 }
154 None
155 }
156
157 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158 let mut visitor = ClosureFinder {
159 found_closure: None,
160 prefer_inner_blocks: true,
161 };
162 visitor.visit_expr(expr);
163 visitor.found_closure
164 }
165}
166
167struct ClosureFinder {
169 found_closure: Option<syn::Expr>,
170 prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175 if self.found_closure.is_some() {
177 return;
178 }
179
180 match expr {
181 syn::Expr::Closure(_) => {
182 self.found_closure = Some(expr.clone());
183 }
184 syn::Expr::Block(block) if self.prefer_inner_blocks => {
185 for stmt in &block.block.stmts {
187 if let syn::Stmt::Expr(stmt_expr, _) = stmt
188 && let syn::Expr::Block(_) = stmt_expr
189 {
190 let mut inner_visitor = ClosureFinder {
192 found_closure: None,
193 prefer_inner_blocks: false, };
195 inner_visitor.visit_expr(stmt_expr);
196 if inner_visitor.found_closure.is_some() {
197 self.found_closure = Some(stmt_expr.clone());
199 return;
200 }
201 }
202 }
203
204 visit::visit_expr(self, expr);
206
207 if self.found_closure.is_some() {
210 }
212 }
213 _ => {
214 visit::visit_expr(self, expr);
216 }
217 }
218 }
219}
220
221#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228 fn from(t: syn::Type) -> Self {
229 Self(Box::new(t))
230 }
231}
232
233impl Deref for DebugType {
234 type Target = syn::Type;
235
236 fn deref(&self) -> &Self::Target {
237 &self.0
238 }
239}
240
241impl ToTokens for DebugType {
242 fn to_tokens(&self, tokens: &mut TokenStream) {
243 self.0.to_tokens(tokens);
244 }
245}
246
247impl Debug for DebugType {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 write!(f, "{}", self.0.to_token_stream())
250 }
251}
252
253pub enum DebugInstantiate {
254 Building,
255 Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259 not(feature = "build"),
260 expect(
261 dead_code,
262 reason = "sink, source unused without `feature = \"build\"`."
263 )
264)]
265pub struct DebugInstantiateFinalized {
266 sink: syn::Expr,
267 source: syn::Expr,
268 connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272 fn from(f: DebugInstantiateFinalized) -> Self {
273 Self::Finalized(Box::new(f))
274 }
275}
276
277impl Debug for DebugInstantiate {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 write!(f, "<network instantiate>")
280 }
281}
282
283impl Hash for DebugInstantiate {
284 fn hash<H: Hasher>(&self, _state: &mut H) {
285 }
287}
288
289impl Clone for DebugInstantiate {
290 fn clone(&self) -> Self {
291 match self {
292 DebugInstantiate::Building => DebugInstantiate::Building,
293 DebugInstantiate::Finalized(_) => {
294 panic!("DebugInstantiate::Finalized should not be cloned")
295 }
296 }
297 }
298}
299
300#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303 Stream(DebugExpr),
304 ExternalNetwork(),
305 Iter(DebugExpr),
306 Spin(),
307 ClusterMembers(LocationId),
308}
309
310#[cfg(feature = "build")]
311pub trait DfirBuilder {
317 fn singleton_intermediates(&self) -> bool;
319
320 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
322
323 fn batch(
324 &mut self,
325 in_ident: syn::Ident,
326 in_location: &LocationId,
327 in_kind: &CollectionKind,
328 out_ident: &syn::Ident,
329 out_location: &LocationId,
330 op_meta: &HydroIrOpMetadata,
331 );
332 fn yield_from_tick(
333 &mut self,
334 in_ident: syn::Ident,
335 in_location: &LocationId,
336 in_kind: &CollectionKind,
337 out_ident: &syn::Ident,
338 out_location: &LocationId,
339 );
340
341 fn begin_atomic(
342 &mut self,
343 in_ident: syn::Ident,
344 in_location: &LocationId,
345 in_kind: &CollectionKind,
346 out_ident: &syn::Ident,
347 out_location: &LocationId,
348 op_meta: &HydroIrOpMetadata,
349 );
350 fn end_atomic(
351 &mut self,
352 in_ident: syn::Ident,
353 in_location: &LocationId,
354 in_kind: &CollectionKind,
355 out_ident: &syn::Ident,
356 );
357
358 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
359 fn observe_nondet(
360 &mut self,
361 trusted: bool,
362 location: &LocationId,
363 in_ident: syn::Ident,
364 in_kind: &CollectionKind,
365 out_ident: &syn::Ident,
366 out_kind: &CollectionKind,
367 op_meta: &HydroIrOpMetadata,
368 );
369
370 #[expect(clippy::too_many_arguments, reason = "TODO")]
371 fn create_network(
372 &mut self,
373 from: &LocationId,
374 to: &LocationId,
375 input_ident: syn::Ident,
376 out_ident: &syn::Ident,
377 serialize: &Option<DebugExpr>,
378 sink: syn::Expr,
379 source: syn::Expr,
380 deserialize: &Option<DebugExpr>,
381 tag_id: usize,
382 );
383
384 fn create_external_source(
385 &mut self,
386 on: &LocationId,
387 source_expr: syn::Expr,
388 out_ident: &syn::Ident,
389 deserialize: &Option<DebugExpr>,
390 tag_id: usize,
391 );
392
393 fn create_external_output(
394 &mut self,
395 on: &LocationId,
396 sink_expr: syn::Expr,
397 input_ident: &syn::Ident,
398 serialize: &Option<DebugExpr>,
399 tag_id: usize,
400 );
401}
402
403#[cfg(feature = "build")]
404impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
405 fn singleton_intermediates(&self) -> bool {
406 false
407 }
408
409 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
410 self.entry(location.root().raw_id()).or_default()
411 }
412
413 fn batch(
414 &mut self,
415 in_ident: syn::Ident,
416 in_location: &LocationId,
417 _in_kind: &CollectionKind,
418 out_ident: &syn::Ident,
419 _out_location: &LocationId,
420 _op_meta: &HydroIrOpMetadata,
421 ) {
422 let builder = self.get_dfir_mut(in_location.root());
423 builder.add_dfir(
424 parse_quote! {
425 #out_ident = #in_ident;
426 },
427 None,
428 None,
429 );
430 }
431
432 fn yield_from_tick(
433 &mut self,
434 in_ident: syn::Ident,
435 in_location: &LocationId,
436 _in_kind: &CollectionKind,
437 out_ident: &syn::Ident,
438 _out_location: &LocationId,
439 ) {
440 let builder = self.get_dfir_mut(in_location.root());
441 builder.add_dfir(
442 parse_quote! {
443 #out_ident = #in_ident;
444 },
445 None,
446 None,
447 );
448 }
449
450 fn begin_atomic(
451 &mut self,
452 in_ident: syn::Ident,
453 in_location: &LocationId,
454 _in_kind: &CollectionKind,
455 out_ident: &syn::Ident,
456 _out_location: &LocationId,
457 _op_meta: &HydroIrOpMetadata,
458 ) {
459 let builder = self.get_dfir_mut(in_location.root());
460 builder.add_dfir(
461 parse_quote! {
462 #out_ident = #in_ident;
463 },
464 None,
465 None,
466 );
467 }
468
469 fn end_atomic(
470 &mut self,
471 in_ident: syn::Ident,
472 in_location: &LocationId,
473 _in_kind: &CollectionKind,
474 out_ident: &syn::Ident,
475 ) {
476 let builder = self.get_dfir_mut(in_location.root());
477 builder.add_dfir(
478 parse_quote! {
479 #out_ident = #in_ident;
480 },
481 None,
482 None,
483 );
484 }
485
486 fn observe_nondet(
487 &mut self,
488 _trusted: bool,
489 location: &LocationId,
490 in_ident: syn::Ident,
491 _in_kind: &CollectionKind,
492 out_ident: &syn::Ident,
493 _out_kind: &CollectionKind,
494 _op_meta: &HydroIrOpMetadata,
495 ) {
496 let builder = self.get_dfir_mut(location);
497 builder.add_dfir(
498 parse_quote! {
499 #out_ident = #in_ident;
500 },
501 None,
502 None,
503 );
504 }
505
506 fn create_network(
507 &mut self,
508 from: &LocationId,
509 to: &LocationId,
510 input_ident: syn::Ident,
511 out_ident: &syn::Ident,
512 serialize: &Option<DebugExpr>,
513 sink: syn::Expr,
514 source: syn::Expr,
515 deserialize: &Option<DebugExpr>,
516 tag_id: usize,
517 ) {
518 let sender_builder = self.get_dfir_mut(from);
519 if let Some(serialize_pipeline) = serialize {
520 sender_builder.add_dfir(
521 parse_quote! {
522 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
523 },
524 None,
525 Some(&format!("send{}", tag_id)),
527 );
528 } else {
529 sender_builder.add_dfir(
530 parse_quote! {
531 #input_ident -> dest_sink(#sink);
532 },
533 None,
534 Some(&format!("send{}", tag_id)),
535 );
536 }
537
538 let receiver_builder = self.get_dfir_mut(to);
539 if let Some(deserialize_pipeline) = deserialize {
540 receiver_builder.add_dfir(
541 parse_quote! {
542 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
543 },
544 None,
545 Some(&format!("recv{}", tag_id)),
546 );
547 } else {
548 receiver_builder.add_dfir(
549 parse_quote! {
550 #out_ident = source_stream(#source);
551 },
552 None,
553 Some(&format!("recv{}", tag_id)),
554 );
555 }
556 }
557
558 fn create_external_source(
559 &mut self,
560 on: &LocationId,
561 source_expr: syn::Expr,
562 out_ident: &syn::Ident,
563 deserialize: &Option<DebugExpr>,
564 tag_id: usize,
565 ) {
566 let receiver_builder = self.get_dfir_mut(on);
567 if let Some(deserialize_pipeline) = deserialize {
568 receiver_builder.add_dfir(
569 parse_quote! {
570 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
571 },
572 None,
573 Some(&format!("recv{}", tag_id)),
574 );
575 } else {
576 receiver_builder.add_dfir(
577 parse_quote! {
578 #out_ident = source_stream(#source_expr);
579 },
580 None,
581 Some(&format!("recv{}", tag_id)),
582 );
583 }
584 }
585
586 fn create_external_output(
587 &mut self,
588 on: &LocationId,
589 sink_expr: syn::Expr,
590 input_ident: &syn::Ident,
591 serialize: &Option<DebugExpr>,
592 tag_id: usize,
593 ) {
594 let sender_builder = self.get_dfir_mut(on);
595 if let Some(serialize_fn) = serialize {
596 sender_builder.add_dfir(
597 parse_quote! {
598 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
599 },
600 None,
601 Some(&format!("send{}", tag_id)),
603 );
604 } else {
605 sender_builder.add_dfir(
606 parse_quote! {
607 #input_ident -> dest_sink(#sink_expr);
608 },
609 None,
610 Some(&format!("send{}", tag_id)),
611 );
612 }
613 }
614}
615
616#[cfg(feature = "build")]
617pub enum BuildersOrCallback<'a, L, N>
618where
619 L: FnMut(&mut HydroRoot, &mut usize),
620 N: FnMut(&mut HydroNode, &mut usize),
621{
622 Builders(&'a mut dyn DfirBuilder),
623 Callback(L, N),
624}
625
626#[derive(Debug, Hash)]
630pub enum HydroRoot {
631 ForEach {
632 f: DebugExpr,
633 input: Box<HydroNode>,
634 op_metadata: HydroIrOpMetadata,
635 },
636 SendExternal {
637 to_external_id: usize,
638 to_key: usize,
639 to_many: bool,
640 unpaired: bool,
641 serialize_fn: Option<DebugExpr>,
642 instantiate_fn: DebugInstantiate,
643 input: Box<HydroNode>,
644 op_metadata: HydroIrOpMetadata,
645 },
646 DestSink {
647 sink: DebugExpr,
648 input: Box<HydroNode>,
649 op_metadata: HydroIrOpMetadata,
650 },
651 CycleSink {
652 ident: syn::Ident,
653 input: Box<HydroNode>,
654 op_metadata: HydroIrOpMetadata,
655 },
656}
657
658impl HydroRoot {
659 #[cfg(feature = "build")]
660 pub fn compile_network<'a, D>(
661 &mut self,
662 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
663 seen_tees: &mut SeenTees,
664 processes: &HashMap<usize, D::Process>,
665 clusters: &HashMap<usize, D::Cluster>,
666 externals: &HashMap<usize, D::External>,
667 ) where
668 D: Deploy<'a>,
669 {
670 let refcell_extra_stmts = RefCell::new(extra_stmts);
671 self.transform_bottom_up(
672 &mut |l| {
673 if let HydroRoot::SendExternal {
674 input,
675 to_external_id,
676 to_key,
677 to_many,
678 unpaired,
679 instantiate_fn,
680 ..
681 } = l
682 {
683 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
684 DebugInstantiate::Building => {
685 let to_node = externals
686 .get(to_external_id)
687 .unwrap_or_else(|| {
688 panic!("A external used in the graph was not instantiated: {}", to_external_id)
689 })
690 .clone();
691
692 match input.metadata().location_kind.root() {
693 LocationId::Process(process_id) => {
694 if *to_many {
695 (
696 (
697 D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
698 parse_quote!(DUMMY),
699 ),
700 Box::new(|| {}) as Box<dyn FnOnce()>,
701 )
702 } else {
703 let from_node = processes
704 .get(process_id)
705 .unwrap_or_else(|| {
706 panic!("A process used in the graph was not instantiated: {}", process_id)
707 })
708 .clone();
709
710 let sink_port = D::allocate_process_port(&from_node);
711 let source_port: <D as Deploy<'a>>::Port = D::allocate_external_port(&to_node);
712
713 if *unpaired {
714 use stageleft::quote_type;
715 use tokio_util::codec::LengthDelimitedCodec;
716
717 to_node.register(*to_key, source_port.clone());
718
719 let _ = D::e2o_source(
720 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
721 &to_node, &source_port,
722 &from_node, &sink_port,
723 "e_type::<LengthDelimitedCodec>(),
724 format!("{}_{}", *to_external_id, *to_key)
725 );
726 }
727
728 (
729 (
730 D::o2e_sink(
731 &from_node,
732 &sink_port,
733 &to_node,
734 &source_port,
735 format!("{}_{}", *to_external_id, *to_key)
736 ),
737 parse_quote!(DUMMY),
738 ),
739 if *unpaired {
740 D::e2o_connect(
741 &to_node,
742 &source_port,
743 &from_node,
744 &sink_port,
745 *to_many,
746 NetworkHint::Auto,
747 )
748 } else {
749 Box::new(|| {}) as Box<dyn FnOnce()>
750 },
751 )
752 }
753 }
754 LocationId::Cluster(_) => todo!(),
755 _ => panic!()
756 }
757 },
758
759 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
760 };
761
762 *instantiate_fn = DebugInstantiateFinalized {
763 sink: sink_expr,
764 source: source_expr,
765 connect_fn: Some(connect_fn),
766 }
767 .into();
768 }
769 },
770 &mut |n| {
771 if let HydroNode::Network {
772 input,
773 instantiate_fn,
774 metadata,
775 ..
776 } = n
777 {
778 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
779 DebugInstantiate::Building => instantiate_network::<D>(
780 input.metadata().location_kind.root(),
781 metadata.location_kind.root(),
782 processes,
783 clusters,
784 ),
785
786 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
787 };
788
789 *instantiate_fn = DebugInstantiateFinalized {
790 sink: sink_expr,
791 source: source_expr,
792 connect_fn: Some(connect_fn),
793 }
794 .into();
795 } else if let HydroNode::ExternalInput {
796 from_external_id,
797 from_key,
798 from_many,
799 codec_type,
800 port_hint,
801 instantiate_fn,
802 metadata,
803 ..
804 } = n
805 {
806 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
807 DebugInstantiate::Building => {
808 let from_node = externals
809 .get(from_external_id)
810 .unwrap_or_else(|| {
811 panic!(
812 "A external used in the graph was not instantiated: {}",
813 from_external_id
814 )
815 })
816 .clone();
817
818 match metadata.location_kind.root() {
819 LocationId::Process(process_id) => {
820 let to_node = processes
821 .get(process_id)
822 .unwrap_or_else(|| {
823 panic!("A process used in the graph was not instantiated: {}", process_id)
824 })
825 .clone();
826
827 let sink_port = D::allocate_external_port(&from_node);
828 let source_port = D::allocate_process_port(&to_node);
829
830 from_node.register(*from_key, sink_port.clone());
831
832 (
833 (
834 parse_quote!(DUMMY),
835 if *from_many {
836 D::e2o_many_source(
837 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
838 &to_node, &source_port,
839 codec_type.0.as_ref(),
840 format!("{}_{}", *from_external_id, *from_key)
841 )
842 } else {
843 D::e2o_source(
844 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
845 &from_node, &sink_port,
846 &to_node, &source_port,
847 codec_type.0.as_ref(),
848 format!("{}_{}", *from_external_id, *from_key)
849 )
850 },
851 ),
852 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
853 )
854 }
855 LocationId::Cluster(_) => todo!(),
856 _ => panic!()
857 }
858 },
859
860 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
861 };
862
863 *instantiate_fn = DebugInstantiateFinalized {
864 sink: sink_expr,
865 source: source_expr,
866 connect_fn: Some(connect_fn),
867 }
868 .into();
869 }
870 },
871 seen_tees,
872 false,
873 );
874 }
875
876 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
877 self.transform_bottom_up(
878 &mut |l| {
879 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
880 match instantiate_fn {
881 DebugInstantiate::Building => panic!("network not built"),
882
883 DebugInstantiate::Finalized(finalized) => {
884 (finalized.connect_fn.take().unwrap())();
885 }
886 }
887 }
888 },
889 &mut |n| {
890 if let HydroNode::Network { instantiate_fn, .. }
891 | HydroNode::ExternalInput { instantiate_fn, .. } = n
892 {
893 match instantiate_fn {
894 DebugInstantiate::Building => panic!("network not built"),
895
896 DebugInstantiate::Finalized(finalized) => {
897 (finalized.connect_fn.take().unwrap())();
898 }
899 }
900 }
901 },
902 seen_tees,
903 false,
904 );
905 }
906
907 pub fn transform_bottom_up(
908 &mut self,
909 transform_root: &mut impl FnMut(&mut HydroRoot),
910 transform_node: &mut impl FnMut(&mut HydroNode),
911 seen_tees: &mut SeenTees,
912 check_well_formed: bool,
913 ) {
914 self.transform_children(
915 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
916 seen_tees,
917 );
918
919 transform_root(self);
920 }
921
922 pub fn transform_children(
923 &mut self,
924 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
925 seen_tees: &mut SeenTees,
926 ) {
927 match self {
928 HydroRoot::ForEach { input, .. }
929 | HydroRoot::SendExternal { input, .. }
930 | HydroRoot::DestSink { input, .. }
931 | HydroRoot::CycleSink { input, .. } => {
932 transform(input, seen_tees);
933 }
934 }
935 }
936
937 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
938 match self {
939 HydroRoot::ForEach {
940 f,
941 input,
942 op_metadata,
943 } => HydroRoot::ForEach {
944 f: f.clone(),
945 input: Box::new(input.deep_clone(seen_tees)),
946 op_metadata: op_metadata.clone(),
947 },
948 HydroRoot::SendExternal {
949 to_external_id,
950 to_key,
951 to_many,
952 unpaired,
953 serialize_fn,
954 instantiate_fn,
955 input,
956 op_metadata,
957 } => HydroRoot::SendExternal {
958 to_external_id: *to_external_id,
959 to_key: *to_key,
960 to_many: *to_many,
961 unpaired: *unpaired,
962 serialize_fn: serialize_fn.clone(),
963 instantiate_fn: instantiate_fn.clone(),
964 input: Box::new(input.deep_clone(seen_tees)),
965 op_metadata: op_metadata.clone(),
966 },
967 HydroRoot::DestSink {
968 sink,
969 input,
970 op_metadata,
971 } => HydroRoot::DestSink {
972 sink: sink.clone(),
973 input: Box::new(input.deep_clone(seen_tees)),
974 op_metadata: op_metadata.clone(),
975 },
976 HydroRoot::CycleSink {
977 ident,
978 input,
979 op_metadata,
980 } => HydroRoot::CycleSink {
981 ident: ident.clone(),
982 input: Box::new(input.deep_clone(seen_tees)),
983 op_metadata: op_metadata.clone(),
984 },
985 }
986 }
987
988 #[cfg(feature = "build")]
989 pub fn emit<'a, D: Deploy<'a>>(
990 &mut self,
991 graph_builders: &mut dyn DfirBuilder,
992 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
993 next_stmt_id: &mut usize,
994 ) {
995 self.emit_core::<D>(
996 &mut BuildersOrCallback::Builders::<
997 fn(&mut HydroRoot, &mut usize),
998 fn(&mut HydroNode, &mut usize),
999 >(graph_builders),
1000 built_tees,
1001 next_stmt_id,
1002 );
1003 }
1004
1005 #[cfg(feature = "build")]
1006 pub fn emit_core<'a, D: Deploy<'a>>(
1007 &mut self,
1008 builders_or_callback: &mut BuildersOrCallback<
1009 impl FnMut(&mut HydroRoot, &mut usize),
1010 impl FnMut(&mut HydroNode, &mut usize),
1011 >,
1012 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1013 next_stmt_id: &mut usize,
1014 ) {
1015 match self {
1016 HydroRoot::ForEach { f, input, .. } => {
1017 let input_ident =
1018 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1019
1020 match builders_or_callback {
1021 BuildersOrCallback::Builders(graph_builders) => {
1022 graph_builders
1023 .get_dfir_mut(&input.metadata().location_kind)
1024 .add_dfir(
1025 parse_quote! {
1026 #input_ident -> for_each(#f);
1027 },
1028 None,
1029 Some(&next_stmt_id.to_string()),
1030 );
1031 }
1032 BuildersOrCallback::Callback(leaf_callback, _) => {
1033 leaf_callback(self, next_stmt_id);
1034 }
1035 }
1036
1037 *next_stmt_id += 1;
1038 }
1039
1040 HydroRoot::SendExternal {
1041 serialize_fn,
1042 instantiate_fn,
1043 input,
1044 ..
1045 } => {
1046 let input_ident =
1047 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1048
1049 match builders_or_callback {
1050 BuildersOrCallback::Builders(graph_builders) => {
1051 let (sink_expr, _) = match instantiate_fn {
1052 DebugInstantiate::Building => (
1053 syn::parse_quote!(DUMMY_SINK),
1054 syn::parse_quote!(DUMMY_SOURCE),
1055 ),
1056
1057 DebugInstantiate::Finalized(finalized) => {
1058 (finalized.sink.clone(), finalized.source.clone())
1059 }
1060 };
1061
1062 graph_builders.create_external_output(
1063 &input.metadata().location_kind,
1064 sink_expr,
1065 &input_ident,
1066 serialize_fn,
1067 *next_stmt_id,
1068 );
1069 }
1070 BuildersOrCallback::Callback(leaf_callback, _) => {
1071 leaf_callback(self, next_stmt_id);
1072 }
1073 }
1074
1075 *next_stmt_id += 1;
1076 }
1077
1078 HydroRoot::DestSink { sink, input, .. } => {
1079 let input_ident =
1080 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1081
1082 match builders_or_callback {
1083 BuildersOrCallback::Builders(graph_builders) => {
1084 graph_builders
1085 .get_dfir_mut(&input.metadata().location_kind)
1086 .add_dfir(
1087 parse_quote! {
1088 #input_ident -> dest_sink(#sink);
1089 },
1090 None,
1091 Some(&next_stmt_id.to_string()),
1092 );
1093 }
1094 BuildersOrCallback::Callback(leaf_callback, _) => {
1095 leaf_callback(self, next_stmt_id);
1096 }
1097 }
1098
1099 *next_stmt_id += 1;
1100 }
1101
1102 HydroRoot::CycleSink { ident, input, .. } => {
1103 let input_ident =
1104 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
1105
1106 match builders_or_callback {
1107 BuildersOrCallback::Builders(graph_builders) => {
1108 graph_builders
1109 .get_dfir_mut(&input.metadata().location_kind)
1110 .add_dfir(
1111 parse_quote! {
1112 #ident = #input_ident;
1113 },
1114 None,
1115 None,
1116 );
1117 }
1118 BuildersOrCallback::Callback(_, _) => {}
1120 }
1121 }
1122 }
1123 }
1124
1125 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1126 match self {
1127 HydroRoot::ForEach { op_metadata, .. }
1128 | HydroRoot::SendExternal { op_metadata, .. }
1129 | HydroRoot::DestSink { op_metadata, .. }
1130 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1131 }
1132 }
1133
1134 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1135 match self {
1136 HydroRoot::ForEach { op_metadata, .. }
1137 | HydroRoot::SendExternal { op_metadata, .. }
1138 | HydroRoot::DestSink { op_metadata, .. }
1139 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1140 }
1141 }
1142
1143 pub fn input(&self) -> &HydroNode {
1144 match self {
1145 HydroRoot::ForEach { input, .. }
1146 | HydroRoot::SendExternal { input, .. }
1147 | HydroRoot::DestSink { input, .. }
1148 | HydroRoot::CycleSink { input, .. } => input,
1149 }
1150 }
1151
1152 pub fn input_metadata(&self) -> &HydroIrMetadata {
1153 self.input().metadata()
1154 }
1155
1156 pub fn print_root(&self) -> String {
1157 match self {
1158 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1159 HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1160 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1161 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1162 }
1163 }
1164
1165 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1166 match self {
1167 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1168 transform(f);
1169 }
1170 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1171 }
1172 }
1173}
1174
1175#[cfg(feature = "build")]
1176pub fn emit<'a, D: Deploy<'a>>(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1177 let mut builders = BTreeMap::new();
1178 let mut built_tees = HashMap::new();
1179 let mut next_stmt_id = 0;
1180 for leaf in ir {
1181 leaf.emit::<D>(&mut builders, &mut built_tees, &mut next_stmt_id);
1182 }
1183 builders
1184}
1185
1186#[cfg(feature = "build")]
1187pub fn traverse_dfir<'a, D: Deploy<'a>>(
1188 ir: &mut [HydroRoot],
1189 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1190 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1191) {
1192 let mut seen_tees = HashMap::new();
1193 let mut next_stmt_id = 0;
1194 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1195 ir.iter_mut().for_each(|leaf| {
1196 leaf.emit_core::<D>(&mut callback, &mut seen_tees, &mut next_stmt_id);
1197 });
1198}
1199
1200pub fn transform_bottom_up(
1201 ir: &mut [HydroRoot],
1202 transform_root: &mut impl FnMut(&mut HydroRoot),
1203 transform_node: &mut impl FnMut(&mut HydroNode),
1204 check_well_formed: bool,
1205) {
1206 let mut seen_tees = HashMap::new();
1207 ir.iter_mut().for_each(|leaf| {
1208 leaf.transform_bottom_up(
1209 transform_root,
1210 transform_node,
1211 &mut seen_tees,
1212 check_well_formed,
1213 );
1214 });
1215}
1216
1217pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1218 let mut seen_tees = HashMap::new();
1219 ir.iter()
1220 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1221 .collect()
1222}
1223
1224type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1225thread_local! {
1226 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1227}
1228
1229pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1230 PRINTED_TEES.with(|printed_tees| {
1231 let mut printed_tees_mut = printed_tees.borrow_mut();
1232 *printed_tees_mut = Some((0, HashMap::new()));
1233 drop(printed_tees_mut);
1234
1235 let ret = f();
1236
1237 let mut printed_tees_mut = printed_tees.borrow_mut();
1238 *printed_tees_mut = None;
1239
1240 ret
1241 })
1242}
1243
1244pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1245
1246impl TeeNode {
1247 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1248 Rc::as_ptr(&self.0)
1249 }
1250}
1251
1252impl Debug for TeeNode {
1253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1254 PRINTED_TEES.with(|printed_tees| {
1255 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1256 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1257
1258 if let Some(printed_tees_mut) = printed_tees_mut {
1259 if let Some(existing) = printed_tees_mut
1260 .1
1261 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1262 {
1263 write!(f, "<tee {}>", existing)
1264 } else {
1265 let next_id = printed_tees_mut.0;
1266 printed_tees_mut.0 += 1;
1267 printed_tees_mut
1268 .1
1269 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1270 drop(printed_tees_mut_borrow);
1271 write!(f, "<tee {}>: ", next_id)?;
1272 Debug::fmt(&self.0.borrow(), f)
1273 }
1274 } else {
1275 drop(printed_tees_mut_borrow);
1276 write!(f, "<tee>: ")?;
1277 Debug::fmt(&self.0.borrow(), f)
1278 }
1279 })
1280 }
1281}
1282
1283impl Hash for TeeNode {
1284 fn hash<H: Hasher>(&self, state: &mut H) {
1285 self.0.borrow_mut().hash(state);
1286 }
1287}
1288
1289#[derive(Clone, PartialEq, Eq, Debug)]
1290pub enum BoundKind {
1291 Unbounded,
1292 Bounded,
1293}
1294
1295#[derive(Clone, PartialEq, Eq, Debug)]
1296pub enum StreamOrder {
1297 NoOrder,
1298 TotalOrder,
1299}
1300
1301#[derive(Clone, PartialEq, Eq, Debug)]
1302pub enum StreamRetry {
1303 AtLeastOnce,
1304 ExactlyOnce,
1305}
1306
1307#[derive(Clone, PartialEq, Eq, Debug)]
1308pub enum KeyedSingletonBoundKind {
1309 Unbounded,
1310 BoundedValue,
1311 Bounded,
1312}
1313
1314#[derive(Clone, PartialEq, Eq, Debug)]
1315pub enum CollectionKind {
1316 Stream {
1317 bound: BoundKind,
1318 order: StreamOrder,
1319 retry: StreamRetry,
1320 element_type: DebugType,
1321 },
1322 Singleton {
1323 bound: BoundKind,
1324 element_type: DebugType,
1325 },
1326 Optional {
1327 bound: BoundKind,
1328 element_type: DebugType,
1329 },
1330 KeyedStream {
1331 bound: BoundKind,
1332 value_order: StreamOrder,
1333 value_retry: StreamRetry,
1334 key_type: DebugType,
1335 value_type: DebugType,
1336 },
1337 KeyedSingleton {
1338 bound: KeyedSingletonBoundKind,
1339 key_type: DebugType,
1340 value_type: DebugType,
1341 },
1342}
1343
1344#[derive(Clone)]
1345pub struct HydroIrMetadata {
1346 pub location_kind: LocationId,
1347 pub collection_kind: CollectionKind,
1348 pub cardinality: Option<usize>,
1349 pub tag: Option<String>,
1350 pub op: HydroIrOpMetadata,
1351}
1352
1353impl Hash for HydroIrMetadata {
1355 fn hash<H: Hasher>(&self, _: &mut H) {}
1356}
1357
1358impl PartialEq for HydroIrMetadata {
1359 fn eq(&self, _: &Self) -> bool {
1360 true
1361 }
1362}
1363
1364impl Eq for HydroIrMetadata {}
1365
1366impl Debug for HydroIrMetadata {
1367 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1368 f.debug_struct("HydroIrMetadata")
1369 .field("location_kind", &self.location_kind)
1370 .field("collection_kind", &self.collection_kind)
1371 .finish()
1372 }
1373}
1374
1375#[derive(Clone)]
1378pub struct HydroIrOpMetadata {
1379 pub backtrace: Backtrace,
1380 pub cpu_usage: Option<f64>,
1381 pub network_recv_cpu_usage: Option<f64>,
1382 pub id: Option<usize>,
1383}
1384
1385impl HydroIrOpMetadata {
1386 #[expect(
1387 clippy::new_without_default,
1388 reason = "explicit calls to new ensure correct backtrace bounds"
1389 )]
1390 pub fn new() -> HydroIrOpMetadata {
1391 Self::new_with_skip(1)
1392 }
1393
1394 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1395 HydroIrOpMetadata {
1396 backtrace: Backtrace::get_backtrace(2 + skip_count),
1397 cpu_usage: None,
1398 network_recv_cpu_usage: None,
1399 id: None,
1400 }
1401 }
1402}
1403
1404impl Debug for HydroIrOpMetadata {
1405 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1406 f.debug_struct("HydroIrOpMetadata").finish()
1407 }
1408}
1409
1410impl Hash for HydroIrOpMetadata {
1411 fn hash<H: Hasher>(&self, _: &mut H) {}
1412}
1413
1414#[derive(Debug, Hash)]
1417pub enum HydroNode {
1418 Placeholder,
1419
1420 Cast {
1428 inner: Box<HydroNode>,
1429 metadata: HydroIrMetadata,
1430 },
1431
1432 ObserveNonDet {
1438 inner: Box<HydroNode>,
1439 trusted: bool, metadata: HydroIrMetadata,
1441 },
1442
1443 Source {
1444 source: HydroSource,
1445 metadata: HydroIrMetadata,
1446 },
1447
1448 SingletonSource {
1449 value: DebugExpr,
1450 metadata: HydroIrMetadata,
1451 },
1452
1453 CycleSource {
1454 ident: syn::Ident,
1455 metadata: HydroIrMetadata,
1456 },
1457
1458 Tee {
1459 inner: TeeNode,
1460 metadata: HydroIrMetadata,
1461 },
1462
1463 BeginAtomic {
1464 inner: Box<HydroNode>,
1465 metadata: HydroIrMetadata,
1466 },
1467
1468 EndAtomic {
1469 inner: Box<HydroNode>,
1470 metadata: HydroIrMetadata,
1471 },
1472
1473 Batch {
1474 inner: Box<HydroNode>,
1475 metadata: HydroIrMetadata,
1476 },
1477
1478 YieldConcat {
1479 inner: Box<HydroNode>,
1480 metadata: HydroIrMetadata,
1481 },
1482
1483 Chain {
1484 first: Box<HydroNode>,
1485 second: Box<HydroNode>,
1486 metadata: HydroIrMetadata,
1487 },
1488
1489 ChainFirst {
1490 first: Box<HydroNode>,
1491 second: Box<HydroNode>,
1492 metadata: HydroIrMetadata,
1493 },
1494
1495 CrossProduct {
1496 left: Box<HydroNode>,
1497 right: Box<HydroNode>,
1498 metadata: HydroIrMetadata,
1499 },
1500
1501 CrossSingleton {
1502 left: Box<HydroNode>,
1503 right: Box<HydroNode>,
1504 metadata: HydroIrMetadata,
1505 },
1506
1507 Join {
1508 left: Box<HydroNode>,
1509 right: Box<HydroNode>,
1510 metadata: HydroIrMetadata,
1511 },
1512
1513 Difference {
1514 pos: Box<HydroNode>,
1515 neg: Box<HydroNode>,
1516 metadata: HydroIrMetadata,
1517 },
1518
1519 AntiJoin {
1520 pos: Box<HydroNode>,
1521 neg: Box<HydroNode>,
1522 metadata: HydroIrMetadata,
1523 },
1524
1525 ResolveFutures {
1526 input: Box<HydroNode>,
1527 metadata: HydroIrMetadata,
1528 },
1529 ResolveFuturesOrdered {
1530 input: Box<HydroNode>,
1531 metadata: HydroIrMetadata,
1532 },
1533
1534 Map {
1535 f: DebugExpr,
1536 input: Box<HydroNode>,
1537 metadata: HydroIrMetadata,
1538 },
1539 FlatMap {
1540 f: DebugExpr,
1541 input: Box<HydroNode>,
1542 metadata: HydroIrMetadata,
1543 },
1544 Filter {
1545 f: DebugExpr,
1546 input: Box<HydroNode>,
1547 metadata: HydroIrMetadata,
1548 },
1549 FilterMap {
1550 f: DebugExpr,
1551 input: Box<HydroNode>,
1552 metadata: HydroIrMetadata,
1553 },
1554
1555 DeferTick {
1556 input: Box<HydroNode>,
1557 metadata: HydroIrMetadata,
1558 },
1559 Enumerate {
1560 input: Box<HydroNode>,
1561 metadata: HydroIrMetadata,
1562 },
1563 Inspect {
1564 f: DebugExpr,
1565 input: Box<HydroNode>,
1566 metadata: HydroIrMetadata,
1567 },
1568
1569 Unique {
1570 input: Box<HydroNode>,
1571 metadata: HydroIrMetadata,
1572 },
1573
1574 Sort {
1575 input: Box<HydroNode>,
1576 metadata: HydroIrMetadata,
1577 },
1578 Fold {
1579 init: DebugExpr,
1580 acc: DebugExpr,
1581 input: Box<HydroNode>,
1582 metadata: HydroIrMetadata,
1583 },
1584
1585 Scan {
1586 init: DebugExpr,
1587 acc: DebugExpr,
1588 input: Box<HydroNode>,
1589 metadata: HydroIrMetadata,
1590 },
1591 FoldKeyed {
1592 init: DebugExpr,
1593 acc: DebugExpr,
1594 input: Box<HydroNode>,
1595 metadata: HydroIrMetadata,
1596 },
1597
1598 Reduce {
1599 f: DebugExpr,
1600 input: Box<HydroNode>,
1601 metadata: HydroIrMetadata,
1602 },
1603 ReduceKeyed {
1604 f: DebugExpr,
1605 input: Box<HydroNode>,
1606 metadata: HydroIrMetadata,
1607 },
1608 ReduceKeyedWatermark {
1609 f: DebugExpr,
1610 input: Box<HydroNode>,
1611 watermark: Box<HydroNode>,
1612 metadata: HydroIrMetadata,
1613 },
1614
1615 Network {
1616 serialize_fn: Option<DebugExpr>,
1617 instantiate_fn: DebugInstantiate,
1618 deserialize_fn: Option<DebugExpr>,
1619 input: Box<HydroNode>,
1620 metadata: HydroIrMetadata,
1621 },
1622
1623 ExternalInput {
1624 from_external_id: usize,
1625 from_key: usize,
1626 from_many: bool,
1627 codec_type: DebugType,
1628 port_hint: NetworkHint,
1629 instantiate_fn: DebugInstantiate,
1630 deserialize_fn: Option<DebugExpr>,
1631 metadata: HydroIrMetadata,
1632 },
1633
1634 Counter {
1635 tag: String,
1636 duration: DebugExpr,
1637 prefix: String,
1638 input: Box<HydroNode>,
1639 metadata: HydroIrMetadata,
1640 },
1641}
1642
1643pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1644pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1645
1646impl HydroNode {
1647 pub fn transform_bottom_up(
1648 &mut self,
1649 transform: &mut impl FnMut(&mut HydroNode),
1650 seen_tees: &mut SeenTees,
1651 check_well_formed: bool,
1652 ) {
1653 self.transform_children(
1654 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1655 seen_tees,
1656 );
1657
1658 transform(self);
1659
1660 let self_location = self.metadata().location_kind.root();
1661
1662 if check_well_formed {
1663 match &*self {
1664 HydroNode::Network { .. } => {}
1665 _ => {
1666 self.input_metadata().iter().for_each(|i| {
1667 if i.location_kind.root() != self_location {
1668 panic!(
1669 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1670 i,
1671 i.location_kind.root(),
1672 self,
1673 self_location
1674 )
1675 }
1676 });
1677 }
1678 }
1679 }
1680 }
1681
1682 #[inline(always)]
1683 pub fn transform_children(
1684 &mut self,
1685 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1686 seen_tees: &mut SeenTees,
1687 ) {
1688 match self {
1689 HydroNode::Placeholder => {
1690 panic!();
1691 }
1692
1693 HydroNode::Source { .. }
1694 | HydroNode::SingletonSource { .. }
1695 | HydroNode::CycleSource { .. }
1696 | HydroNode::ExternalInput { .. } => {}
1697
1698 HydroNode::Tee { inner, .. } => {
1699 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1700 *inner = TeeNode(transformed.clone());
1701 } else {
1702 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1703 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1704 let mut orig = inner.0.replace(HydroNode::Placeholder);
1705 transform(&mut orig, seen_tees);
1706 *transformed_cell.borrow_mut() = orig;
1707 *inner = TeeNode(transformed_cell);
1708 }
1709 }
1710
1711 HydroNode::Cast { inner, .. }
1712 | HydroNode::ObserveNonDet { inner, .. }
1713 | HydroNode::BeginAtomic { inner, .. }
1714 | HydroNode::EndAtomic { inner, .. }
1715 | HydroNode::Batch { inner, .. }
1716 | HydroNode::YieldConcat { inner, .. } => {
1717 transform(inner.as_mut(), seen_tees);
1718 }
1719
1720 HydroNode::Chain { first, second, .. } => {
1721 transform(first.as_mut(), seen_tees);
1722 transform(second.as_mut(), seen_tees);
1723 }
1724
1725 HydroNode::ChainFirst { first, second, .. } => {
1726 transform(first.as_mut(), seen_tees);
1727 transform(second.as_mut(), seen_tees);
1728 }
1729
1730 HydroNode::CrossSingleton { left, right, .. }
1731 | HydroNode::CrossProduct { left, right, .. }
1732 | HydroNode::Join { left, right, .. } => {
1733 transform(left.as_mut(), seen_tees);
1734 transform(right.as_mut(), seen_tees);
1735 }
1736
1737 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1738 transform(pos.as_mut(), seen_tees);
1739 transform(neg.as_mut(), seen_tees);
1740 }
1741
1742 HydroNode::ReduceKeyedWatermark {
1743 input, watermark, ..
1744 } => {
1745 transform(input.as_mut(), seen_tees);
1746 transform(watermark.as_mut(), seen_tees);
1747 }
1748
1749 HydroNode::Map { input, .. }
1750 | HydroNode::ResolveFutures { input, .. }
1751 | HydroNode::ResolveFuturesOrdered { input, .. }
1752 | HydroNode::FlatMap { input, .. }
1753 | HydroNode::Filter { input, .. }
1754 | HydroNode::FilterMap { input, .. }
1755 | HydroNode::Sort { input, .. }
1756 | HydroNode::DeferTick { input, .. }
1757 | HydroNode::Enumerate { input, .. }
1758 | HydroNode::Inspect { input, .. }
1759 | HydroNode::Unique { input, .. }
1760 | HydroNode::Network { input, .. }
1761 | HydroNode::Fold { input, .. }
1762 | HydroNode::Scan { input, .. }
1763 | HydroNode::FoldKeyed { input, .. }
1764 | HydroNode::Reduce { input, .. }
1765 | HydroNode::ReduceKeyed { input, .. }
1766 | HydroNode::Counter { input, .. } => {
1767 transform(input.as_mut(), seen_tees);
1768 }
1769 }
1770 }
1771
1772 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1773 match self {
1774 HydroNode::Placeholder => HydroNode::Placeholder,
1775 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1776 inner: Box::new(inner.deep_clone(seen_tees)),
1777 metadata: metadata.clone(),
1778 },
1779 HydroNode::ObserveNonDet {
1780 inner,
1781 trusted,
1782 metadata,
1783 } => HydroNode::ObserveNonDet {
1784 inner: Box::new(inner.deep_clone(seen_tees)),
1785 trusted: *trusted,
1786 metadata: metadata.clone(),
1787 },
1788 HydroNode::Source { source, metadata } => HydroNode::Source {
1789 source: source.clone(),
1790 metadata: metadata.clone(),
1791 },
1792 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1793 value: value.clone(),
1794 metadata: metadata.clone(),
1795 },
1796 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1797 ident: ident.clone(),
1798 metadata: metadata.clone(),
1799 },
1800 HydroNode::Tee { inner, metadata } => {
1801 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1802 HydroNode::Tee {
1803 inner: TeeNode(transformed.clone()),
1804 metadata: metadata.clone(),
1805 }
1806 } else {
1807 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1808 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1809 let cloned = inner.0.borrow().deep_clone(seen_tees);
1810 *new_rc.borrow_mut() = cloned;
1811 HydroNode::Tee {
1812 inner: TeeNode(new_rc),
1813 metadata: metadata.clone(),
1814 }
1815 }
1816 }
1817 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1818 inner: Box::new(inner.deep_clone(seen_tees)),
1819 metadata: metadata.clone(),
1820 },
1821 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1822 inner: Box::new(inner.deep_clone(seen_tees)),
1823 metadata: metadata.clone(),
1824 },
1825 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1826 inner: Box::new(inner.deep_clone(seen_tees)),
1827 metadata: metadata.clone(),
1828 },
1829 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1830 inner: Box::new(inner.deep_clone(seen_tees)),
1831 metadata: metadata.clone(),
1832 },
1833 HydroNode::Chain {
1834 first,
1835 second,
1836 metadata,
1837 } => HydroNode::Chain {
1838 first: Box::new(first.deep_clone(seen_tees)),
1839 second: Box::new(second.deep_clone(seen_tees)),
1840 metadata: metadata.clone(),
1841 },
1842 HydroNode::ChainFirst {
1843 first,
1844 second,
1845 metadata,
1846 } => HydroNode::ChainFirst {
1847 first: Box::new(first.deep_clone(seen_tees)),
1848 second: Box::new(second.deep_clone(seen_tees)),
1849 metadata: metadata.clone(),
1850 },
1851 HydroNode::CrossProduct {
1852 left,
1853 right,
1854 metadata,
1855 } => HydroNode::CrossProduct {
1856 left: Box::new(left.deep_clone(seen_tees)),
1857 right: Box::new(right.deep_clone(seen_tees)),
1858 metadata: metadata.clone(),
1859 },
1860 HydroNode::CrossSingleton {
1861 left,
1862 right,
1863 metadata,
1864 } => HydroNode::CrossSingleton {
1865 left: Box::new(left.deep_clone(seen_tees)),
1866 right: Box::new(right.deep_clone(seen_tees)),
1867 metadata: metadata.clone(),
1868 },
1869 HydroNode::Join {
1870 left,
1871 right,
1872 metadata,
1873 } => HydroNode::Join {
1874 left: Box::new(left.deep_clone(seen_tees)),
1875 right: Box::new(right.deep_clone(seen_tees)),
1876 metadata: metadata.clone(),
1877 },
1878 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1879 pos: Box::new(pos.deep_clone(seen_tees)),
1880 neg: Box::new(neg.deep_clone(seen_tees)),
1881 metadata: metadata.clone(),
1882 },
1883 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1884 pos: Box::new(pos.deep_clone(seen_tees)),
1885 neg: Box::new(neg.deep_clone(seen_tees)),
1886 metadata: metadata.clone(),
1887 },
1888 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1889 input: Box::new(input.deep_clone(seen_tees)),
1890 metadata: metadata.clone(),
1891 },
1892 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1893 HydroNode::ResolveFuturesOrdered {
1894 input: Box::new(input.deep_clone(seen_tees)),
1895 metadata: metadata.clone(),
1896 }
1897 }
1898 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1899 f: f.clone(),
1900 input: Box::new(input.deep_clone(seen_tees)),
1901 metadata: metadata.clone(),
1902 },
1903 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1904 f: f.clone(),
1905 input: Box::new(input.deep_clone(seen_tees)),
1906 metadata: metadata.clone(),
1907 },
1908 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1909 f: f.clone(),
1910 input: Box::new(input.deep_clone(seen_tees)),
1911 metadata: metadata.clone(),
1912 },
1913 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1914 f: f.clone(),
1915 input: Box::new(input.deep_clone(seen_tees)),
1916 metadata: metadata.clone(),
1917 },
1918 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1919 input: Box::new(input.deep_clone(seen_tees)),
1920 metadata: metadata.clone(),
1921 },
1922 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1923 input: Box::new(input.deep_clone(seen_tees)),
1924 metadata: metadata.clone(),
1925 },
1926 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1927 f: f.clone(),
1928 input: Box::new(input.deep_clone(seen_tees)),
1929 metadata: metadata.clone(),
1930 },
1931 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1932 input: Box::new(input.deep_clone(seen_tees)),
1933 metadata: metadata.clone(),
1934 },
1935 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1936 input: Box::new(input.deep_clone(seen_tees)),
1937 metadata: metadata.clone(),
1938 },
1939 HydroNode::Fold {
1940 init,
1941 acc,
1942 input,
1943 metadata,
1944 } => HydroNode::Fold {
1945 init: init.clone(),
1946 acc: acc.clone(),
1947 input: Box::new(input.deep_clone(seen_tees)),
1948 metadata: metadata.clone(),
1949 },
1950 HydroNode::Scan {
1951 init,
1952 acc,
1953 input,
1954 metadata,
1955 } => HydroNode::Scan {
1956 init: init.clone(),
1957 acc: acc.clone(),
1958 input: Box::new(input.deep_clone(seen_tees)),
1959 metadata: metadata.clone(),
1960 },
1961 HydroNode::FoldKeyed {
1962 init,
1963 acc,
1964 input,
1965 metadata,
1966 } => HydroNode::FoldKeyed {
1967 init: init.clone(),
1968 acc: acc.clone(),
1969 input: Box::new(input.deep_clone(seen_tees)),
1970 metadata: metadata.clone(),
1971 },
1972 HydroNode::ReduceKeyedWatermark {
1973 f,
1974 input,
1975 watermark,
1976 metadata,
1977 } => HydroNode::ReduceKeyedWatermark {
1978 f: f.clone(),
1979 input: Box::new(input.deep_clone(seen_tees)),
1980 watermark: Box::new(watermark.deep_clone(seen_tees)),
1981 metadata: metadata.clone(),
1982 },
1983 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1984 f: f.clone(),
1985 input: Box::new(input.deep_clone(seen_tees)),
1986 metadata: metadata.clone(),
1987 },
1988 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1989 f: f.clone(),
1990 input: Box::new(input.deep_clone(seen_tees)),
1991 metadata: metadata.clone(),
1992 },
1993 HydroNode::Network {
1994 serialize_fn,
1995 instantiate_fn,
1996 deserialize_fn,
1997 input,
1998 metadata,
1999 } => HydroNode::Network {
2000 serialize_fn: serialize_fn.clone(),
2001 instantiate_fn: instantiate_fn.clone(),
2002 deserialize_fn: deserialize_fn.clone(),
2003 input: Box::new(input.deep_clone(seen_tees)),
2004 metadata: metadata.clone(),
2005 },
2006 HydroNode::ExternalInput {
2007 from_external_id,
2008 from_key,
2009 from_many,
2010 codec_type,
2011 port_hint,
2012 instantiate_fn,
2013 deserialize_fn,
2014 metadata,
2015 } => HydroNode::ExternalInput {
2016 from_external_id: *from_external_id,
2017 from_key: *from_key,
2018 from_many: *from_many,
2019 codec_type: codec_type.clone(),
2020 port_hint: *port_hint,
2021 instantiate_fn: instantiate_fn.clone(),
2022 deserialize_fn: deserialize_fn.clone(),
2023 metadata: metadata.clone(),
2024 },
2025 HydroNode::Counter {
2026 tag,
2027 duration,
2028 prefix,
2029 input,
2030 metadata,
2031 } => HydroNode::Counter {
2032 tag: tag.clone(),
2033 duration: duration.clone(),
2034 prefix: prefix.clone(),
2035 input: Box::new(input.deep_clone(seen_tees)),
2036 metadata: metadata.clone(),
2037 },
2038 }
2039 }
2040
2041 #[cfg(feature = "build")]
2042 pub fn emit_core<'a, D: Deploy<'a>>(
2043 &mut self,
2044 builders_or_callback: &mut BuildersOrCallback<
2045 impl FnMut(&mut HydroRoot, &mut usize),
2046 impl FnMut(&mut HydroNode, &mut usize),
2047 >,
2048 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2049 next_stmt_id: &mut usize,
2050 ) -> syn::Ident {
2051 let out_location = self.metadata().location_kind.clone();
2052 match self {
2053 HydroNode::Placeholder => {
2054 panic!()
2055 }
2056
2057 HydroNode::Cast { inner, .. } => {
2058 let inner_ident =
2059 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2060
2061 match builders_or_callback {
2062 BuildersOrCallback::Builders(_) => {}
2063 BuildersOrCallback::Callback(_, node_callback) => {
2064 node_callback(self, next_stmt_id);
2065 }
2066 }
2067
2068 *next_stmt_id += 1;
2069
2070 inner_ident
2071 }
2072
2073 HydroNode::ObserveNonDet {
2074 inner,
2075 trusted,
2076 metadata,
2077 ..
2078 } => {
2079 let inner_ident =
2080 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2081
2082 let observe_ident =
2083 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2084
2085 match builders_or_callback {
2086 BuildersOrCallback::Builders(graph_builders) => {
2087 graph_builders.observe_nondet(
2088 *trusted,
2089 &inner.metadata().location_kind,
2090 inner_ident,
2091 &inner.metadata().collection_kind,
2092 &observe_ident,
2093 &metadata.collection_kind,
2094 &metadata.op,
2095 );
2096 }
2097 BuildersOrCallback::Callback(_, node_callback) => {
2098 node_callback(self, next_stmt_id);
2099 }
2100 }
2101
2102 *next_stmt_id += 1;
2103
2104 observe_ident
2105 }
2106
2107 HydroNode::Batch {
2108 inner, metadata, ..
2109 } => {
2110 let inner_ident =
2111 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2112
2113 let batch_ident =
2114 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2115
2116 match builders_or_callback {
2117 BuildersOrCallback::Builders(graph_builders) => {
2118 graph_builders.batch(
2119 inner_ident,
2120 &inner.metadata().location_kind,
2121 &inner.metadata().collection_kind,
2122 &batch_ident,
2123 &out_location,
2124 &metadata.op,
2125 );
2126 }
2127 BuildersOrCallback::Callback(_, node_callback) => {
2128 node_callback(self, next_stmt_id);
2129 }
2130 }
2131
2132 *next_stmt_id += 1;
2133
2134 batch_ident
2135 }
2136
2137 HydroNode::YieldConcat { inner, .. } => {
2138 let inner_ident =
2139 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2140
2141 let yield_ident =
2142 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2143
2144 match builders_or_callback {
2145 BuildersOrCallback::Builders(graph_builders) => {
2146 graph_builders.yield_from_tick(
2147 inner_ident,
2148 &inner.metadata().location_kind,
2149 &inner.metadata().collection_kind,
2150 &yield_ident,
2151 &out_location,
2152 );
2153 }
2154 BuildersOrCallback::Callback(_, node_callback) => {
2155 node_callback(self, next_stmt_id);
2156 }
2157 }
2158
2159 *next_stmt_id += 1;
2160
2161 yield_ident
2162 }
2163
2164 HydroNode::BeginAtomic { inner, metadata } => {
2165 let inner_ident =
2166 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2167
2168 let begin_ident =
2169 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2170
2171 match builders_or_callback {
2172 BuildersOrCallback::Builders(graph_builders) => {
2173 graph_builders.begin_atomic(
2174 inner_ident,
2175 &inner.metadata().location_kind,
2176 &inner.metadata().collection_kind,
2177 &begin_ident,
2178 &out_location,
2179 &metadata.op,
2180 );
2181 }
2182 BuildersOrCallback::Callback(_, node_callback) => {
2183 node_callback(self, next_stmt_id);
2184 }
2185 }
2186
2187 *next_stmt_id += 1;
2188
2189 begin_ident
2190 }
2191
2192 HydroNode::EndAtomic { inner, .. } => {
2193 let inner_ident =
2194 inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2195
2196 let end_ident =
2197 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2198
2199 match builders_or_callback {
2200 BuildersOrCallback::Builders(graph_builders) => {
2201 graph_builders.end_atomic(
2202 inner_ident,
2203 &inner.metadata().location_kind,
2204 &inner.metadata().collection_kind,
2205 &end_ident,
2206 );
2207 }
2208 BuildersOrCallback::Callback(_, node_callback) => {
2209 node_callback(self, next_stmt_id);
2210 }
2211 }
2212
2213 *next_stmt_id += 1;
2214
2215 end_ident
2216 }
2217
2218 HydroNode::Source {
2219 source, metadata, ..
2220 } => {
2221 if let HydroSource::ExternalNetwork() = source {
2222 syn::Ident::new("DUMMY", Span::call_site())
2223 } else {
2224 let source_ident =
2225 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2226
2227 let source_stmt = match source {
2228 HydroSource::Stream(expr) => {
2229 debug_assert!(metadata.location_kind.is_top_level());
2230 parse_quote! {
2231 #source_ident = source_stream(#expr);
2232 }
2233 }
2234
2235 HydroSource::ExternalNetwork() => {
2236 unreachable!()
2237 }
2238
2239 HydroSource::Iter(expr) => {
2240 if metadata.location_kind.is_top_level() {
2241 parse_quote! {
2242 #source_ident = source_iter(#expr);
2243 }
2244 } else {
2245 parse_quote! {
2247 #source_ident = source_iter(#expr) -> persist::<'static>();
2248 }
2249 }
2250 }
2251
2252 HydroSource::Spin() => {
2253 debug_assert!(metadata.location_kind.is_top_level());
2254 parse_quote! {
2255 #source_ident = spin();
2256 }
2257 }
2258
2259 HydroSource::ClusterMembers(location_id) => {
2260 debug_assert!(metadata.location_kind.is_top_level());
2261
2262 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2263 D::cluster_membership_stream(location_id),
2264 &(),
2265 );
2266
2267 parse_quote! {
2268 #source_ident = source_stream(#expr);
2269 }
2270 }
2271 };
2272
2273 match builders_or_callback {
2274 BuildersOrCallback::Builders(graph_builders) => {
2275 let builder = graph_builders.get_dfir_mut(&out_location);
2276 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2277 }
2278 BuildersOrCallback::Callback(_, node_callback) => {
2279 node_callback(self, next_stmt_id);
2280 }
2281 }
2282
2283 *next_stmt_id += 1;
2284
2285 source_ident
2286 }
2287 }
2288
2289 HydroNode::SingletonSource { value, metadata } => {
2290 let source_ident =
2291 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2292
2293 match builders_or_callback {
2294 BuildersOrCallback::Builders(graph_builders) => {
2295 let should_replay = !graph_builders.singleton_intermediates();
2296 let builder = graph_builders.get_dfir_mut(&out_location);
2297
2298 if should_replay || !metadata.location_kind.is_top_level() {
2299 builder.add_dfir(
2300 parse_quote! {
2301 #source_ident = source_iter([#value]) -> persist::<'static>();
2302 },
2303 None,
2304 Some(&next_stmt_id.to_string()),
2305 );
2306 } else {
2307 builder.add_dfir(
2308 parse_quote! {
2309 #source_ident = source_iter([#value]);
2310 },
2311 None,
2312 Some(&next_stmt_id.to_string()),
2313 );
2314 }
2315 }
2316 BuildersOrCallback::Callback(_, node_callback) => {
2317 node_callback(self, next_stmt_id);
2318 }
2319 }
2320
2321 *next_stmt_id += 1;
2322
2323 source_ident
2324 }
2325
2326 HydroNode::CycleSource { ident, .. } => {
2327 let ident = ident.clone();
2328
2329 match builders_or_callback {
2330 BuildersOrCallback::Builders(_) => {}
2331 BuildersOrCallback::Callback(_, node_callback) => {
2332 node_callback(self, next_stmt_id);
2333 }
2334 }
2335
2336 *next_stmt_id += 1;
2338
2339 ident
2340 }
2341
2342 HydroNode::Tee { inner, .. } => {
2343 let ret_ident = if let Some(teed_from) =
2344 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2345 {
2346 match builders_or_callback {
2347 BuildersOrCallback::Builders(_) => {}
2348 BuildersOrCallback::Callback(_, node_callback) => {
2349 node_callback(self, next_stmt_id);
2350 }
2351 }
2352
2353 teed_from.clone()
2354 } else {
2355 let inner_ident = inner.0.borrow_mut().emit_core::<D>(
2356 builders_or_callback,
2357 built_tees,
2358 next_stmt_id,
2359 );
2360
2361 let tee_ident =
2362 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2363
2364 built_tees.insert(
2365 inner.0.as_ref() as *const RefCell<HydroNode>,
2366 tee_ident.clone(),
2367 );
2368
2369 match builders_or_callback {
2370 BuildersOrCallback::Builders(graph_builders) => {
2371 let builder = graph_builders.get_dfir_mut(&out_location);
2372 builder.add_dfir(
2373 parse_quote! {
2374 #tee_ident = #inner_ident -> tee();
2375 },
2376 None,
2377 Some(&next_stmt_id.to_string()),
2378 );
2379 }
2380 BuildersOrCallback::Callback(_, node_callback) => {
2381 node_callback(self, next_stmt_id);
2382 }
2383 }
2384
2385 tee_ident
2386 };
2387
2388 *next_stmt_id += 1;
2392 ret_ident
2393 }
2394
2395 HydroNode::Chain { first, second, .. } => {
2396 let first_ident =
2397 first.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2398 let second_ident =
2399 second.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2400
2401 let chain_ident =
2402 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2403
2404 match builders_or_callback {
2405 BuildersOrCallback::Builders(graph_builders) => {
2406 let builder = graph_builders.get_dfir_mut(&out_location);
2407 builder.add_dfir(
2408 parse_quote! {
2409 #chain_ident = chain();
2410 #first_ident -> [0]#chain_ident;
2411 #second_ident -> [1]#chain_ident;
2412 },
2413 None,
2414 Some(&next_stmt_id.to_string()),
2415 );
2416 }
2417 BuildersOrCallback::Callback(_, node_callback) => {
2418 node_callback(self, next_stmt_id);
2419 }
2420 }
2421
2422 *next_stmt_id += 1;
2423
2424 chain_ident
2425 }
2426
2427 HydroNode::ChainFirst { first, second, .. } => {
2428 let first_ident =
2429 first.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2430 let second_ident =
2431 second.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2432
2433 let chain_ident =
2434 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2435
2436 match builders_or_callback {
2437 BuildersOrCallback::Builders(graph_builders) => {
2438 let builder = graph_builders.get_dfir_mut(&out_location);
2439 builder.add_dfir(
2440 parse_quote! {
2441 #chain_ident = chain_first_n(1);
2442 #first_ident -> [0]#chain_ident;
2443 #second_ident -> [1]#chain_ident;
2444 },
2445 None,
2446 Some(&next_stmt_id.to_string()),
2447 );
2448 }
2449 BuildersOrCallback::Callback(_, node_callback) => {
2450 node_callback(self, next_stmt_id);
2451 }
2452 }
2453
2454 *next_stmt_id += 1;
2455
2456 chain_ident
2457 }
2458
2459 HydroNode::CrossSingleton { left, right, .. } => {
2460 let left_ident =
2461 left.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2462 let right_ident =
2463 right.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2464
2465 let cross_ident =
2466 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2467
2468 match builders_or_callback {
2469 BuildersOrCallback::Builders(graph_builders) => {
2470 let builder = graph_builders.get_dfir_mut(&out_location);
2471 builder.add_dfir(
2472 parse_quote! {
2473 #cross_ident = cross_singleton();
2474 #left_ident -> [input]#cross_ident;
2475 #right_ident -> [single]#cross_ident;
2476 },
2477 None,
2478 Some(&next_stmt_id.to_string()),
2479 );
2480 }
2481 BuildersOrCallback::Callback(_, node_callback) => {
2482 node_callback(self, next_stmt_id);
2483 }
2484 }
2485
2486 *next_stmt_id += 1;
2487
2488 cross_ident
2489 }
2490
2491 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2492 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2493 parse_quote!(cross_join_multiset)
2494 } else {
2495 parse_quote!(join_multiset)
2496 };
2497
2498 let (HydroNode::CrossProduct { left, right, .. }
2499 | HydroNode::Join { left, right, .. }) = self
2500 else {
2501 unreachable!()
2502 };
2503
2504 let is_top_level = left.metadata().location_kind.is_top_level()
2505 && right.metadata().location_kind.is_top_level();
2506 let (left_inner, left_lifetime) = if left.metadata().location_kind.is_top_level() {
2507 (left, quote!('static))
2508 } else {
2509 (left, quote!('tick))
2510 };
2511
2512 let (right_inner, right_lifetime) = if right.metadata().location_kind.is_top_level()
2513 {
2514 (right, quote!('static))
2515 } else {
2516 (right, quote!('tick))
2517 };
2518
2519 let left_ident =
2520 left_inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2521 let right_ident =
2522 right_inner.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2523
2524 let stream_ident =
2525 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2526
2527 match builders_or_callback {
2528 BuildersOrCallback::Builders(graph_builders) => {
2529 let builder = graph_builders.get_dfir_mut(&out_location);
2530 builder.add_dfir(
2531 if is_top_level {
2532 parse_quote! {
2535 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2536 #left_ident -> [0]#stream_ident;
2537 #right_ident -> [1]#stream_ident;
2538 }
2539 } else {
2540 parse_quote! {
2541 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2542 #left_ident -> [0]#stream_ident;
2543 #right_ident -> [1]#stream_ident;
2544 }
2545 }
2546 ,
2547 None,
2548 Some(&next_stmt_id.to_string()),
2549 );
2550 }
2551 BuildersOrCallback::Callback(_, node_callback) => {
2552 node_callback(self, next_stmt_id);
2553 }
2554 }
2555
2556 *next_stmt_id += 1;
2557
2558 stream_ident
2559 }
2560
2561 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2562 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2563 parse_quote!(difference)
2564 } else {
2565 parse_quote!(anti_join)
2566 };
2567
2568 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2569 self
2570 else {
2571 unreachable!()
2572 };
2573
2574 let (neg, neg_lifetime) = if neg.metadata().location_kind.is_top_level() {
2575 (neg, quote!('static))
2576 } else {
2577 (neg, quote!('tick))
2578 };
2579
2580 let pos_ident = pos.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2581 let neg_ident = neg.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2582
2583 let stream_ident =
2584 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2585
2586 match builders_or_callback {
2587 BuildersOrCallback::Builders(graph_builders) => {
2588 let builder = graph_builders.get_dfir_mut(&out_location);
2589 builder.add_dfir(
2590 parse_quote! {
2591 #stream_ident = #operator::<'tick, #neg_lifetime>();
2592 #pos_ident -> [pos]#stream_ident;
2593 #neg_ident -> [neg]#stream_ident;
2594 },
2595 None,
2596 Some(&next_stmt_id.to_string()),
2597 );
2598 }
2599 BuildersOrCallback::Callback(_, node_callback) => {
2600 node_callback(self, next_stmt_id);
2601 }
2602 }
2603
2604 *next_stmt_id += 1;
2605
2606 stream_ident
2607 }
2608
2609 HydroNode::ResolveFutures { input, .. } => {
2610 let input_ident =
2611 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2612
2613 let futures_ident =
2614 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2615
2616 match builders_or_callback {
2617 BuildersOrCallback::Builders(graph_builders) => {
2618 let builder = graph_builders.get_dfir_mut(&out_location);
2619 builder.add_dfir(
2620 parse_quote! {
2621 #futures_ident = #input_ident -> resolve_futures();
2622 },
2623 None,
2624 Some(&next_stmt_id.to_string()),
2625 );
2626 }
2627 BuildersOrCallback::Callback(_, node_callback) => {
2628 node_callback(self, next_stmt_id);
2629 }
2630 }
2631
2632 *next_stmt_id += 1;
2633
2634 futures_ident
2635 }
2636
2637 HydroNode::ResolveFuturesOrdered { input, .. } => {
2638 let input_ident =
2639 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2640
2641 let futures_ident =
2642 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2643
2644 match builders_or_callback {
2645 BuildersOrCallback::Builders(graph_builders) => {
2646 let builder = graph_builders.get_dfir_mut(&out_location);
2647 builder.add_dfir(
2648 parse_quote! {
2649 #futures_ident = #input_ident -> resolve_futures_ordered();
2650 },
2651 None,
2652 Some(&next_stmt_id.to_string()),
2653 );
2654 }
2655 BuildersOrCallback::Callback(_, node_callback) => {
2656 node_callback(self, next_stmt_id);
2657 }
2658 }
2659
2660 *next_stmt_id += 1;
2661
2662 futures_ident
2663 }
2664
2665 HydroNode::Map { f, input, .. } => {
2666 let input_ident =
2667 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2668
2669 let map_ident =
2670 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2671
2672 match builders_or_callback {
2673 BuildersOrCallback::Builders(graph_builders) => {
2674 let builder = graph_builders.get_dfir_mut(&out_location);
2675 builder.add_dfir(
2676 parse_quote! {
2677 #map_ident = #input_ident -> map(#f);
2678 },
2679 None,
2680 Some(&next_stmt_id.to_string()),
2681 );
2682 }
2683 BuildersOrCallback::Callback(_, node_callback) => {
2684 node_callback(self, next_stmt_id);
2685 }
2686 }
2687
2688 *next_stmt_id += 1;
2689
2690 map_ident
2691 }
2692
2693 HydroNode::FlatMap { f, input, .. } => {
2694 let input_ident =
2695 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2696
2697 let flat_map_ident =
2698 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2699
2700 match builders_or_callback {
2701 BuildersOrCallback::Builders(graph_builders) => {
2702 let builder = graph_builders.get_dfir_mut(&out_location);
2703 builder.add_dfir(
2704 parse_quote! {
2705 #flat_map_ident = #input_ident -> flat_map(#f);
2706 },
2707 None,
2708 Some(&next_stmt_id.to_string()),
2709 );
2710 }
2711 BuildersOrCallback::Callback(_, node_callback) => {
2712 node_callback(self, next_stmt_id);
2713 }
2714 }
2715
2716 *next_stmt_id += 1;
2717
2718 flat_map_ident
2719 }
2720
2721 HydroNode::Filter { f, input, .. } => {
2722 let input_ident =
2723 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2724
2725 let filter_ident =
2726 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2727
2728 match builders_or_callback {
2729 BuildersOrCallback::Builders(graph_builders) => {
2730 let builder = graph_builders.get_dfir_mut(&out_location);
2731 builder.add_dfir(
2732 parse_quote! {
2733 #filter_ident = #input_ident -> filter(#f);
2734 },
2735 None,
2736 Some(&next_stmt_id.to_string()),
2737 );
2738 }
2739 BuildersOrCallback::Callback(_, node_callback) => {
2740 node_callback(self, next_stmt_id);
2741 }
2742 }
2743
2744 *next_stmt_id += 1;
2745
2746 filter_ident
2747 }
2748
2749 HydroNode::FilterMap { f, input, .. } => {
2750 let input_ident =
2751 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2752
2753 let filter_map_ident =
2754 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2755
2756 match builders_or_callback {
2757 BuildersOrCallback::Builders(graph_builders) => {
2758 let builder = graph_builders.get_dfir_mut(&out_location);
2759 builder.add_dfir(
2760 parse_quote! {
2761 #filter_map_ident = #input_ident -> filter_map(#f);
2762 },
2763 None,
2764 Some(&next_stmt_id.to_string()),
2765 );
2766 }
2767 BuildersOrCallback::Callback(_, node_callback) => {
2768 node_callback(self, next_stmt_id);
2769 }
2770 }
2771
2772 *next_stmt_id += 1;
2773
2774 filter_map_ident
2775 }
2776
2777 HydroNode::Sort { input, .. } => {
2778 let input_ident =
2779 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2780
2781 let sort_ident =
2782 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2783
2784 match builders_or_callback {
2785 BuildersOrCallback::Builders(graph_builders) => {
2786 let builder = graph_builders.get_dfir_mut(&out_location);
2787 builder.add_dfir(
2788 parse_quote! {
2789 #sort_ident = #input_ident -> sort();
2790 },
2791 None,
2792 Some(&next_stmt_id.to_string()),
2793 );
2794 }
2795 BuildersOrCallback::Callback(_, node_callback) => {
2796 node_callback(self, next_stmt_id);
2797 }
2798 }
2799
2800 *next_stmt_id += 1;
2801
2802 sort_ident
2803 }
2804
2805 HydroNode::DeferTick { input, .. } => {
2806 let input_ident =
2807 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2808
2809 let defer_tick_ident =
2810 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2811
2812 match builders_or_callback {
2813 BuildersOrCallback::Builders(graph_builders) => {
2814 let builder = graph_builders.get_dfir_mut(&out_location);
2815 builder.add_dfir(
2816 parse_quote! {
2817 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2818 },
2819 None,
2820 Some(&next_stmt_id.to_string()),
2821 );
2822 }
2823 BuildersOrCallback::Callback(_, node_callback) => {
2824 node_callback(self, next_stmt_id);
2825 }
2826 }
2827
2828 *next_stmt_id += 1;
2829
2830 defer_tick_ident
2831 }
2832
2833 HydroNode::Enumerate { input, .. } => {
2834 let input_ident =
2835 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2836
2837 let enumerate_ident =
2838 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2839
2840 match builders_or_callback {
2841 BuildersOrCallback::Builders(graph_builders) => {
2842 let builder = graph_builders.get_dfir_mut(&out_location);
2843 let lifetime = if input.metadata().location_kind.is_top_level() {
2844 quote!('static)
2845 } else {
2846 quote!('tick)
2847 };
2848 builder.add_dfir(
2849 parse_quote! {
2850 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2851 },
2852 None,
2853 Some(&next_stmt_id.to_string()),
2854 );
2855 }
2856 BuildersOrCallback::Callback(_, node_callback) => {
2857 node_callback(self, next_stmt_id);
2858 }
2859 }
2860
2861 *next_stmt_id += 1;
2862
2863 enumerate_ident
2864 }
2865
2866 HydroNode::Inspect { f, input, .. } => {
2867 let input_ident =
2868 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2869
2870 let inspect_ident =
2871 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2872
2873 match builders_or_callback {
2874 BuildersOrCallback::Builders(graph_builders) => {
2875 let builder = graph_builders.get_dfir_mut(&out_location);
2876 builder.add_dfir(
2877 parse_quote! {
2878 #inspect_ident = #input_ident -> inspect(#f);
2879 },
2880 None,
2881 Some(&next_stmt_id.to_string()),
2882 );
2883 }
2884 BuildersOrCallback::Callback(_, node_callback) => {
2885 node_callback(self, next_stmt_id);
2886 }
2887 }
2888
2889 *next_stmt_id += 1;
2890
2891 inspect_ident
2892 }
2893
2894 HydroNode::Unique { input, .. } => {
2895 let input_ident =
2896 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2897
2898 let unique_ident =
2899 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2900
2901 match builders_or_callback {
2902 BuildersOrCallback::Builders(graph_builders) => {
2903 let builder = graph_builders.get_dfir_mut(&out_location);
2904 let lifetime = if input.metadata().location_kind.is_top_level() {
2905 quote!('static)
2906 } else {
2907 quote!('tick)
2908 };
2909
2910 builder.add_dfir(
2911 parse_quote! {
2912 #unique_ident = #input_ident -> unique::<#lifetime>();
2913 },
2914 None,
2915 Some(&next_stmt_id.to_string()),
2916 );
2917 }
2918 BuildersOrCallback::Callback(_, node_callback) => {
2919 node_callback(self, next_stmt_id);
2920 }
2921 }
2922
2923 *next_stmt_id += 1;
2924
2925 unique_ident
2926 }
2927
2928 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2929 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2930 parse_quote!(fold)
2931 } else if matches!(self, HydroNode::Scan { .. }) {
2932 parse_quote!(scan)
2933 } else {
2934 parse_quote!(fold_keyed)
2935 };
2936
2937 let (HydroNode::Fold { input, .. }
2938 | HydroNode::FoldKeyed { input, .. }
2939 | HydroNode::Scan { input, .. }) = self
2940 else {
2941 unreachable!()
2942 };
2943
2944 let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
2945 (input, quote!('static))
2946 } else {
2947 (input, quote!('tick))
2948 };
2949
2950 let input_ident =
2951 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
2952
2953 let (HydroNode::Fold { init, acc, .. }
2954 | HydroNode::FoldKeyed { init, acc, .. }
2955 | HydroNode::Scan { init, acc, .. }) = &*self
2956 else {
2957 unreachable!()
2958 };
2959
2960 let fold_ident =
2961 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2962
2963 match builders_or_callback {
2964 BuildersOrCallback::Builders(graph_builders) => {
2965 if matches!(self, HydroNode::Fold { .. })
2966 && self.metadata().location_kind.is_top_level()
2967 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2968 && graph_builders.singleton_intermediates()
2969 {
2970 let builder = graph_builders.get_dfir_mut(&out_location);
2971
2972 let acc: syn::Expr = parse_quote!({
2973 let mut __inner = #acc;
2974 move |__state, __value| {
2975 __inner(__state, __value);
2976 Some(__state.clone())
2977 }
2978 });
2979
2980 builder.add_dfir(
2981 parse_quote! {
2982 source_iter([(#init)()]) -> [0]#fold_ident;
2983 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
2984 #fold_ident = chain();
2985 },
2986 None,
2987 Some(&next_stmt_id.to_string()),
2988 );
2989 } else if matches!(self, HydroNode::FoldKeyed { .. })
2990 && self.metadata().location_kind.is_top_level()
2991 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2992 && graph_builders.singleton_intermediates()
2993 {
2994 let builder = graph_builders.get_dfir_mut(&out_location);
2995
2996 let acc: syn::Expr = parse_quote!({
2997 let mut __init = #init;
2998 let mut __inner = #acc;
2999 move |__state, __kv: (_, _)| {
3000 let __state = __state
3002 .entry(::std::clone::Clone::clone(&__kv.0))
3003 .or_insert_with(|| (__init)());
3004 __inner(__state, __kv.1);
3005 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3006 }
3007 });
3008
3009 builder.add_dfir(
3010 parse_quote! {
3011 source_iter([(#init)()]) -> [0]#fold_ident;
3012 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3013 },
3014 None,
3015 Some(&next_stmt_id.to_string()),
3016 );
3017 } else {
3018 let builder = graph_builders.get_dfir_mut(&out_location);
3019 builder.add_dfir(
3020 parse_quote! {
3021 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3022 },
3023 None,
3024 Some(&next_stmt_id.to_string()),
3025 );
3026 }
3027 }
3028 BuildersOrCallback::Callback(_, node_callback) => {
3029 node_callback(self, next_stmt_id);
3030 }
3031 }
3032
3033 *next_stmt_id += 1;
3034
3035 fold_ident
3036 }
3037
3038 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3039 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
3040 parse_quote!(reduce)
3041 } else {
3042 parse_quote!(reduce_keyed)
3043 };
3044
3045 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = self
3046 else {
3047 unreachable!()
3048 };
3049
3050 let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
3051 (input, quote!('static))
3052 } else {
3053 (input, quote!('tick))
3054 };
3055
3056 let input_ident =
3057 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3058
3059 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*self
3060 else {
3061 unreachable!()
3062 };
3063
3064 let reduce_ident =
3065 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3066
3067 match builders_or_callback {
3068 BuildersOrCallback::Builders(graph_builders) => {
3069 if matches!(self, HydroNode::Reduce { .. })
3070 && self.metadata().location_kind.is_top_level()
3071 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3072 && graph_builders.singleton_intermediates()
3073 {
3074 todo!(
3075 "Reduce with optional intermediates is not yet supported in simulator"
3076 );
3077 } else if matches!(self, HydroNode::ReduceKeyed { .. })
3078 && self.metadata().location_kind.is_top_level()
3079 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3080 && graph_builders.singleton_intermediates()
3081 {
3082 todo!();
3083 } else {
3084 let builder = graph_builders.get_dfir_mut(&out_location);
3085 builder.add_dfir(
3086 parse_quote! {
3087 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3088 },
3089 None,
3090 Some(&next_stmt_id.to_string()),
3091 );
3092 }
3093 }
3094 BuildersOrCallback::Callback(_, node_callback) => {
3095 node_callback(self, next_stmt_id);
3096 }
3097 }
3098
3099 *next_stmt_id += 1;
3100
3101 reduce_ident
3102 }
3103
3104 HydroNode::ReduceKeyedWatermark {
3105 f,
3106 input,
3107 watermark,
3108 ..
3109 } => {
3110 let (input, lifetime) = if input.metadata().location_kind.is_top_level() {
3111 (input, quote!('static))
3112 } else {
3113 (input, quote!('tick))
3114 };
3115
3116 let input_ident =
3117 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3118
3119 let watermark_ident =
3120 watermark.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3121
3122 let chain_ident = syn::Ident::new(
3123 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3124 Span::call_site(),
3125 );
3126
3127 let fold_ident =
3128 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3129
3130 match builders_or_callback {
3131 BuildersOrCallback::Builders(graph_builders) => {
3132 let builder = graph_builders.get_dfir_mut(&out_location);
3133 builder.add_dfir(
3138 parse_quote! {
3139 #chain_ident = chain();
3140 #input_ident
3141 -> map(|x| (Some(x), None))
3142 -> [0]#chain_ident;
3143 #watermark_ident
3144 -> map(|watermark| (None, Some(watermark)))
3145 -> [1]#chain_ident;
3146
3147 #fold_ident = #chain_ident
3148 -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3149 let __reduce_keyed_fn = #f;
3150 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3151 if let Some((k, v)) = opt_payload {
3152 if let Some(curr_watermark) = *opt_curr_watermark {
3153 if k <= curr_watermark {
3154 return;
3155 }
3156 }
3157 match map.entry(k) {
3158 ::std::collections::hash_map::Entry::Vacant(e) => {
3159 e.insert(v);
3160 }
3161 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3162 __reduce_keyed_fn(e.get_mut(), v);
3163 }
3164 }
3165 } else {
3166 let watermark = opt_watermark.unwrap();
3167 if let Some(curr_watermark) = *opt_curr_watermark {
3168 if watermark <= curr_watermark {
3169 return;
3170 }
3171 }
3172 *opt_curr_watermark = opt_watermark;
3173 map.retain(|k, _| *k > watermark);
3174 }
3175 }
3176 })
3177 -> flat_map(|(map, _curr_watermark)| map);
3178 },
3179 None,
3180 Some(&next_stmt_id.to_string()),
3181 );
3182 }
3183 BuildersOrCallback::Callback(_, node_callback) => {
3184 node_callback(self, next_stmt_id);
3185 }
3186 }
3187
3188 *next_stmt_id += 1;
3189
3190 fold_ident
3191 }
3192
3193 HydroNode::Network {
3194 serialize_fn: serialize_pipeline,
3195 instantiate_fn,
3196 deserialize_fn: deserialize_pipeline,
3197 input,
3198 ..
3199 } => {
3200 let input_ident =
3201 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3202
3203 let receiver_stream_ident =
3204 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3205
3206 match builders_or_callback {
3207 BuildersOrCallback::Builders(graph_builders) => {
3208 let (sink_expr, source_expr) = match instantiate_fn {
3209 DebugInstantiate::Building => (
3210 syn::parse_quote!(DUMMY_SINK),
3211 syn::parse_quote!(DUMMY_SOURCE),
3212 ),
3213
3214 DebugInstantiate::Finalized(finalized) => {
3215 (finalized.sink.clone(), finalized.source.clone())
3216 }
3217 };
3218
3219 graph_builders.create_network(
3220 &input.metadata().location_kind,
3221 &out_location,
3222 input_ident,
3223 &receiver_stream_ident,
3224 serialize_pipeline,
3225 sink_expr,
3226 source_expr,
3227 deserialize_pipeline,
3228 *next_stmt_id,
3229 );
3230 }
3231 BuildersOrCallback::Callback(_, node_callback) => {
3232 node_callback(self, next_stmt_id);
3233 }
3234 }
3235
3236 *next_stmt_id += 1;
3237
3238 receiver_stream_ident
3239 }
3240
3241 HydroNode::ExternalInput {
3242 instantiate_fn,
3243 deserialize_fn: deserialize_pipeline,
3244 ..
3245 } => {
3246 let receiver_stream_ident =
3247 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3248
3249 match builders_or_callback {
3250 BuildersOrCallback::Builders(graph_builders) => {
3251 let (_, source_expr) = match instantiate_fn {
3252 DebugInstantiate::Building => (
3253 syn::parse_quote!(DUMMY_SINK),
3254 syn::parse_quote!(DUMMY_SOURCE),
3255 ),
3256
3257 DebugInstantiate::Finalized(finalized) => {
3258 (finalized.sink.clone(), finalized.source.clone())
3259 }
3260 };
3261
3262 graph_builders.create_external_source(
3263 &out_location,
3264 source_expr,
3265 &receiver_stream_ident,
3266 deserialize_pipeline,
3267 *next_stmt_id,
3268 );
3269 }
3270 BuildersOrCallback::Callback(_, node_callback) => {
3271 node_callback(self, next_stmt_id);
3272 }
3273 }
3274
3275 *next_stmt_id += 1;
3276
3277 receiver_stream_ident
3278 }
3279
3280 HydroNode::Counter {
3281 tag,
3282 duration,
3283 prefix,
3284 input,
3285 ..
3286 } => {
3287 let input_ident =
3288 input.emit_core::<D>(builders_or_callback, built_tees, next_stmt_id);
3289
3290 let counter_ident =
3291 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3292
3293 match builders_or_callback {
3294 BuildersOrCallback::Builders(graph_builders) => {
3295 let builder = graph_builders.get_dfir_mut(&out_location);
3296 builder.add_dfir(
3297 parse_quote! {
3298 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3299 },
3300 None,
3301 Some(&next_stmt_id.to_string()),
3302 );
3303 }
3304 BuildersOrCallback::Callback(_, node_callback) => {
3305 node_callback(self, next_stmt_id);
3306 }
3307 }
3308
3309 *next_stmt_id += 1;
3310
3311 counter_ident
3312 }
3313 }
3314 }
3315
3316 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3317 match self {
3318 HydroNode::Placeholder => {
3319 panic!()
3320 }
3321 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3322 HydroNode::Source { source, .. } => match source {
3323 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3324 HydroSource::ExternalNetwork()
3325 | HydroSource::Spin()
3326 | HydroSource::ClusterMembers(_) => {} },
3328 HydroNode::SingletonSource { value, .. } => {
3329 transform(value);
3330 }
3331 HydroNode::CycleSource { .. }
3332 | HydroNode::Tee { .. }
3333 | HydroNode::YieldConcat { .. }
3334 | HydroNode::BeginAtomic { .. }
3335 | HydroNode::EndAtomic { .. }
3336 | HydroNode::Batch { .. }
3337 | HydroNode::Chain { .. }
3338 | HydroNode::ChainFirst { .. }
3339 | HydroNode::CrossProduct { .. }
3340 | HydroNode::CrossSingleton { .. }
3341 | HydroNode::ResolveFutures { .. }
3342 | HydroNode::ResolveFuturesOrdered { .. }
3343 | HydroNode::Join { .. }
3344 | HydroNode::Difference { .. }
3345 | HydroNode::AntiJoin { .. }
3346 | HydroNode::DeferTick { .. }
3347 | HydroNode::Enumerate { .. }
3348 | HydroNode::Unique { .. }
3349 | HydroNode::Sort { .. } => {}
3350 HydroNode::Map { f, .. }
3351 | HydroNode::FlatMap { f, .. }
3352 | HydroNode::Filter { f, .. }
3353 | HydroNode::FilterMap { f, .. }
3354 | HydroNode::Inspect { f, .. }
3355 | HydroNode::Reduce { f, .. }
3356 | HydroNode::ReduceKeyed { f, .. }
3357 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3358 transform(f);
3359 }
3360 HydroNode::Fold { init, acc, .. }
3361 | HydroNode::Scan { init, acc, .. }
3362 | HydroNode::FoldKeyed { init, acc, .. } => {
3363 transform(init);
3364 transform(acc);
3365 }
3366 HydroNode::Network {
3367 serialize_fn,
3368 deserialize_fn,
3369 ..
3370 } => {
3371 if let Some(serialize_fn) = serialize_fn {
3372 transform(serialize_fn);
3373 }
3374 if let Some(deserialize_fn) = deserialize_fn {
3375 transform(deserialize_fn);
3376 }
3377 }
3378 HydroNode::ExternalInput { deserialize_fn, .. } => {
3379 if let Some(deserialize_fn) = deserialize_fn {
3380 transform(deserialize_fn);
3381 }
3382 }
3383 HydroNode::Counter { duration, .. } => {
3384 transform(duration);
3385 }
3386 }
3387 }
3388
3389 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3390 &self.metadata().op
3391 }
3392
3393 pub fn metadata(&self) -> &HydroIrMetadata {
3394 match self {
3395 HydroNode::Placeholder => {
3396 panic!()
3397 }
3398 HydroNode::Cast { metadata, .. } => metadata,
3399 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3400 HydroNode::Source { metadata, .. } => metadata,
3401 HydroNode::SingletonSource { metadata, .. } => metadata,
3402 HydroNode::CycleSource { metadata, .. } => metadata,
3403 HydroNode::Tee { metadata, .. } => metadata,
3404 HydroNode::YieldConcat { metadata, .. } => metadata,
3405 HydroNode::BeginAtomic { metadata, .. } => metadata,
3406 HydroNode::EndAtomic { metadata, .. } => metadata,
3407 HydroNode::Batch { metadata, .. } => metadata,
3408 HydroNode::Chain { metadata, .. } => metadata,
3409 HydroNode::ChainFirst { metadata, .. } => metadata,
3410 HydroNode::CrossProduct { metadata, .. } => metadata,
3411 HydroNode::CrossSingleton { metadata, .. } => metadata,
3412 HydroNode::Join { metadata, .. } => metadata,
3413 HydroNode::Difference { metadata, .. } => metadata,
3414 HydroNode::AntiJoin { metadata, .. } => metadata,
3415 HydroNode::ResolveFutures { metadata, .. } => metadata,
3416 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3417 HydroNode::Map { metadata, .. } => metadata,
3418 HydroNode::FlatMap { metadata, .. } => metadata,
3419 HydroNode::Filter { metadata, .. } => metadata,
3420 HydroNode::FilterMap { metadata, .. } => metadata,
3421 HydroNode::DeferTick { metadata, .. } => metadata,
3422 HydroNode::Enumerate { metadata, .. } => metadata,
3423 HydroNode::Inspect { metadata, .. } => metadata,
3424 HydroNode::Unique { metadata, .. } => metadata,
3425 HydroNode::Sort { metadata, .. } => metadata,
3426 HydroNode::Scan { metadata, .. } => metadata,
3427 HydroNode::Fold { metadata, .. } => metadata,
3428 HydroNode::FoldKeyed { metadata, .. } => metadata,
3429 HydroNode::Reduce { metadata, .. } => metadata,
3430 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3431 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3432 HydroNode::ExternalInput { metadata, .. } => metadata,
3433 HydroNode::Network { metadata, .. } => metadata,
3434 HydroNode::Counter { metadata, .. } => metadata,
3435 }
3436 }
3437
3438 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3439 &mut self.metadata_mut().op
3440 }
3441
3442 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3443 match self {
3444 HydroNode::Placeholder => {
3445 panic!()
3446 }
3447 HydroNode::Cast { metadata, .. } => metadata,
3448 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3449 HydroNode::Source { metadata, .. } => metadata,
3450 HydroNode::SingletonSource { metadata, .. } => metadata,
3451 HydroNode::CycleSource { metadata, .. } => metadata,
3452 HydroNode::Tee { metadata, .. } => metadata,
3453 HydroNode::YieldConcat { metadata, .. } => metadata,
3454 HydroNode::BeginAtomic { metadata, .. } => metadata,
3455 HydroNode::EndAtomic { metadata, .. } => metadata,
3456 HydroNode::Batch { metadata, .. } => metadata,
3457 HydroNode::Chain { metadata, .. } => metadata,
3458 HydroNode::ChainFirst { metadata, .. } => metadata,
3459 HydroNode::CrossProduct { metadata, .. } => metadata,
3460 HydroNode::CrossSingleton { metadata, .. } => metadata,
3461 HydroNode::Join { metadata, .. } => metadata,
3462 HydroNode::Difference { metadata, .. } => metadata,
3463 HydroNode::AntiJoin { metadata, .. } => metadata,
3464 HydroNode::ResolveFutures { metadata, .. } => metadata,
3465 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3466 HydroNode::Map { metadata, .. } => metadata,
3467 HydroNode::FlatMap { metadata, .. } => metadata,
3468 HydroNode::Filter { metadata, .. } => metadata,
3469 HydroNode::FilterMap { metadata, .. } => metadata,
3470 HydroNode::DeferTick { metadata, .. } => metadata,
3471 HydroNode::Enumerate { metadata, .. } => metadata,
3472 HydroNode::Inspect { metadata, .. } => metadata,
3473 HydroNode::Unique { metadata, .. } => metadata,
3474 HydroNode::Sort { metadata, .. } => metadata,
3475 HydroNode::Scan { metadata, .. } => metadata,
3476 HydroNode::Fold { metadata, .. } => metadata,
3477 HydroNode::FoldKeyed { metadata, .. } => metadata,
3478 HydroNode::Reduce { metadata, .. } => metadata,
3479 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3480 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3481 HydroNode::ExternalInput { metadata, .. } => metadata,
3482 HydroNode::Network { metadata, .. } => metadata,
3483 HydroNode::Counter { metadata, .. } => metadata,
3484 }
3485 }
3486
3487 pub fn input(&self) -> Vec<&HydroNode> {
3488 match self {
3489 HydroNode::Placeholder => {
3490 panic!()
3491 }
3492 HydroNode::Source { .. }
3493 | HydroNode::SingletonSource { .. }
3494 | HydroNode::ExternalInput { .. }
3495 | HydroNode::CycleSource { .. }
3496 | HydroNode::Tee { .. } => {
3497 vec![]
3499 }
3500 HydroNode::Cast { inner, .. }
3501 | HydroNode::ObserveNonDet { inner, .. }
3502 | HydroNode::YieldConcat { inner, .. }
3503 | HydroNode::BeginAtomic { inner, .. }
3504 | HydroNode::EndAtomic { inner, .. }
3505 | HydroNode::Batch { inner, .. } => {
3506 vec![inner]
3507 }
3508 HydroNode::Chain { first, second, .. } => {
3509 vec![first, second]
3510 }
3511 HydroNode::ChainFirst { first, second, .. } => {
3512 vec![first, second]
3513 }
3514 HydroNode::CrossProduct { left, right, .. }
3515 | HydroNode::CrossSingleton { left, right, .. }
3516 | HydroNode::Join { left, right, .. } => {
3517 vec![left, right]
3518 }
3519 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3520 vec![pos, neg]
3521 }
3522 HydroNode::Map { input, .. }
3523 | HydroNode::FlatMap { input, .. }
3524 | HydroNode::Filter { input, .. }
3525 | HydroNode::FilterMap { input, .. }
3526 | HydroNode::Sort { input, .. }
3527 | HydroNode::DeferTick { input, .. }
3528 | HydroNode::Enumerate { input, .. }
3529 | HydroNode::Inspect { input, .. }
3530 | HydroNode::Unique { input, .. }
3531 | HydroNode::Network { input, .. }
3532 | HydroNode::Counter { input, .. }
3533 | HydroNode::ResolveFutures { input, .. }
3534 | HydroNode::ResolveFuturesOrdered { input, .. }
3535 | HydroNode::Fold { input, .. }
3536 | HydroNode::FoldKeyed { input, .. }
3537 | HydroNode::Reduce { input, .. }
3538 | HydroNode::ReduceKeyed { input, .. }
3539 | HydroNode::Scan { input, .. } => {
3540 vec![input]
3541 }
3542 HydroNode::ReduceKeyedWatermark {
3543 input, watermark, ..
3544 } => {
3545 vec![input, watermark]
3546 }
3547 }
3548 }
3549
3550 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3551 self.input()
3552 .iter()
3553 .map(|input_node| input_node.metadata())
3554 .collect()
3555 }
3556
3557 pub fn print_root(&self) -> String {
3558 match self {
3559 HydroNode::Placeholder => {
3560 panic!()
3561 }
3562 HydroNode::Cast { .. } => "Cast()".to_string(),
3563 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3564 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3565 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3566 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3567 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3568 HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3569 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3570 HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3571 HydroNode::Batch { .. } => "Batch()".to_string(),
3572 HydroNode::Chain { first, second, .. } => {
3573 format!("Chain({}, {})", first.print_root(), second.print_root())
3574 }
3575 HydroNode::ChainFirst { first, second, .. } => {
3576 format!(
3577 "ChainFirst({}, {})",
3578 first.print_root(),
3579 second.print_root()
3580 )
3581 }
3582 HydroNode::CrossProduct { left, right, .. } => {
3583 format!(
3584 "CrossProduct({}, {})",
3585 left.print_root(),
3586 right.print_root()
3587 )
3588 }
3589 HydroNode::CrossSingleton { left, right, .. } => {
3590 format!(
3591 "CrossSingleton({}, {})",
3592 left.print_root(),
3593 right.print_root()
3594 )
3595 }
3596 HydroNode::Join { left, right, .. } => {
3597 format!("Join({}, {})", left.print_root(), right.print_root())
3598 }
3599 HydroNode::Difference { pos, neg, .. } => {
3600 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3601 }
3602 HydroNode::AntiJoin { pos, neg, .. } => {
3603 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3604 }
3605 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3606 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3607 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3608 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3609 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3610 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3611 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3612 HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3613 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3614 HydroNode::Unique { .. } => "Unique()".to_string(),
3615 HydroNode::Sort { .. } => "Sort()".to_string(),
3616 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3617 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3618 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3619 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3620 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3621 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3622 HydroNode::Network { .. } => "Network()".to_string(),
3623 HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3624 HydroNode::Counter { tag, duration, .. } => {
3625 format!("Counter({:?}, {:?})", tag, duration)
3626 }
3627 }
3628 }
3629}
3630
3631#[cfg(feature = "build")]
3632fn instantiate_network<'a, D>(
3633 from_location: &LocationId,
3634 to_location: &LocationId,
3635 processes: &HashMap<usize, D::Process>,
3636 clusters: &HashMap<usize, D::Cluster>,
3637) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3638where
3639 D: Deploy<'a>,
3640{
3641 let ((sink, source), connect_fn) = match (from_location, to_location) {
3642 (LocationId::Process(from), LocationId::Process(to)) => {
3643 let from_node = processes
3644 .get(from)
3645 .unwrap_or_else(|| {
3646 panic!("A process used in the graph was not instantiated: {}", from)
3647 })
3648 .clone();
3649 let to_node = processes
3650 .get(to)
3651 .unwrap_or_else(|| {
3652 panic!("A process used in the graph was not instantiated: {}", to)
3653 })
3654 .clone();
3655
3656 let sink_port = D::allocate_process_port(&from_node);
3657 let source_port = D::allocate_process_port(&to_node);
3658
3659 (
3660 D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3661 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3662 )
3663 }
3664 (LocationId::Process(from), LocationId::Cluster(to)) => {
3665 let from_node = processes
3666 .get(from)
3667 .unwrap_or_else(|| {
3668 panic!("A process used in the graph was not instantiated: {}", from)
3669 })
3670 .clone();
3671 let to_node = clusters
3672 .get(to)
3673 .unwrap_or_else(|| {
3674 panic!("A cluster used in the graph was not instantiated: {}", to)
3675 })
3676 .clone();
3677
3678 let sink_port = D::allocate_process_port(&from_node);
3679 let source_port = D::allocate_cluster_port(&to_node);
3680
3681 (
3682 D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3683 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3684 )
3685 }
3686 (LocationId::Cluster(from), LocationId::Process(to)) => {
3687 let from_node = clusters
3688 .get(from)
3689 .unwrap_or_else(|| {
3690 panic!("A cluster used in the graph was not instantiated: {}", from)
3691 })
3692 .clone();
3693 let to_node = processes
3694 .get(to)
3695 .unwrap_or_else(|| {
3696 panic!("A process used in the graph was not instantiated: {}", to)
3697 })
3698 .clone();
3699
3700 let sink_port = D::allocate_cluster_port(&from_node);
3701 let source_port = D::allocate_process_port(&to_node);
3702
3703 (
3704 D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3705 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3706 )
3707 }
3708 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3709 let from_node = clusters
3710 .get(from)
3711 .unwrap_or_else(|| {
3712 panic!("A cluster used in the graph was not instantiated: {}", from)
3713 })
3714 .clone();
3715 let to_node = clusters
3716 .get(to)
3717 .unwrap_or_else(|| {
3718 panic!("A cluster used in the graph was not instantiated: {}", to)
3719 })
3720 .clone();
3721
3722 let sink_port = D::allocate_cluster_port(&from_node);
3723 let source_port = D::allocate_cluster_port(&to_node);
3724
3725 (
3726 D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3727 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3728 )
3729 }
3730 (LocationId::Tick(_, _), _) => panic!(),
3731 (_, LocationId::Tick(_, _)) => panic!(),
3732 (LocationId::Atomic(_), _) => panic!(),
3733 (_, LocationId::Atomic(_)) => panic!(),
3734 };
3735 (sink, source, connect_fn)
3736}
3737
3738#[cfg(test)]
3739mod test {
3740 use std::mem::size_of;
3741
3742 use stageleft::{QuotedWithContext, q};
3743
3744 use super::*;
3745
3746 #[test]
3747 #[cfg_attr(
3748 not(feature = "build"),
3749 ignore = "expects inclusion of feature-gated fields"
3750 )]
3751 fn hydro_node_size() {
3752 assert_eq!(size_of::<HydroNode>(), 272);
3753 }
3754
3755 #[test]
3756 #[cfg_attr(
3757 not(feature = "build"),
3758 ignore = "expects inclusion of feature-gated fields"
3759 )]
3760 fn hydro_root_size() {
3761 assert_eq!(size_of::<HydroRoot>(), 168);
3762 }
3763
3764 #[test]
3765 fn test_simplify_q_macro_basic() {
3766 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3768 let result = simplify_q_macro(simple_expr.clone());
3769 assert_eq!(result, simple_expr);
3770 }
3771
3772 #[test]
3773 fn test_simplify_q_macro_actual_stageleft_call() {
3774 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3776 let result = simplify_q_macro(stageleft_call);
3777 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3780 }
3781
3782 #[test]
3783 fn test_closure_no_pipe_at_start() {
3784 let stageleft_call = q!({
3786 let foo = 123;
3787 move |b: usize| b + foo
3788 })
3789 .splice_fn1_ctx(&());
3790 let result = simplify_q_macro(stageleft_call);
3791 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3792 }
3793}