1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24#[cfg(feature = "build")]
25use crate::compile::deploy_provider::{Deploy, RegisterPort};
26use crate::location::NetworkHint;
27use crate::location::dynamic::LocationId;
28
29pub mod backtrace;
30use backtrace::Backtrace;
31
32#[derive(Clone, Hash)]
36pub struct DebugExpr(pub Box<syn::Expr>);
37
38impl From<syn::Expr> for DebugExpr {
39 fn from(expr: syn::Expr) -> Self {
40 Self(Box::new(expr))
41 }
42}
43
44impl Deref for DebugExpr {
45 type Target = syn::Expr;
46
47 fn deref(&self) -> &Self::Target {
48 &self.0
49 }
50}
51
52impl ToTokens for DebugExpr {
53 fn to_tokens(&self, tokens: &mut TokenStream) {
54 self.0.to_tokens(tokens);
55 }
56}
57
58impl Debug for DebugExpr {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", self.0.to_token_stream())
61 }
62}
63
64impl Display for DebugExpr {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 let original = self.0.as_ref().clone();
67 let simplified = simplify_q_macro(original);
68
69 write!(f, "q!({})", quote::quote!(#simplified))
72 }
73}
74
75fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
77 let mut simplifier = QMacroSimplifier::new();
80 simplifier.visit_expr_mut(&mut expr);
81
82 if let Some(simplified) = simplifier.simplified_result {
84 simplified
85 } else {
86 expr
87 }
88}
89
90#[derive(Default)]
92pub struct QMacroSimplifier {
93 pub simplified_result: Option<syn::Expr>,
94}
95
96impl QMacroSimplifier {
97 pub fn new() -> Self {
98 Self::default()
99 }
100}
101
102impl VisitMut for QMacroSimplifier {
103 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
104 if self.simplified_result.is_some() {
106 return;
107 }
108
109 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
110 && self.is_stageleft_runtime_support_call(&path_expr.path)
112 && let Some(closure) = self.extract_closure_from_args(&call.args)
114 {
115 self.simplified_result = Some(closure);
116 return;
117 }
118
119 syn::visit_mut::visit_expr_mut(self, expr);
122 }
123}
124
125impl QMacroSimplifier {
126 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
127 if let Some(last_segment) = path.segments.last() {
129 let fn_name = last_segment.ident.to_string();
130 fn_name.contains("_type_hint")
132 && path.segments.len() > 2
133 && path.segments[0].ident == "stageleft"
134 && path.segments[1].ident == "runtime_support"
135 } else {
136 false
137 }
138 }
139
140 fn extract_closure_from_args(
141 &self,
142 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
143 ) -> Option<syn::Expr> {
144 for arg in args {
146 if let syn::Expr::Closure(_) = arg {
147 return Some(arg.clone());
148 }
149 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
151 return Some(closure_expr);
152 }
153 }
154 None
155 }
156
157 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
158 let mut visitor = ClosureFinder {
159 found_closure: None,
160 prefer_inner_blocks: true,
161 };
162 visitor.visit_expr(expr);
163 visitor.found_closure
164 }
165}
166
167struct ClosureFinder {
169 found_closure: Option<syn::Expr>,
170 prefer_inner_blocks: bool,
171}
172
173impl<'ast> Visit<'ast> for ClosureFinder {
174 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
175 if self.found_closure.is_some() {
177 return;
178 }
179
180 match expr {
181 syn::Expr::Closure(_) => {
182 self.found_closure = Some(expr.clone());
183 }
184 syn::Expr::Block(block) if self.prefer_inner_blocks => {
185 for stmt in &block.block.stmts {
187 if let syn::Stmt::Expr(stmt_expr, _) = stmt
188 && let syn::Expr::Block(_) = stmt_expr
189 {
190 let mut inner_visitor = ClosureFinder {
192 found_closure: None,
193 prefer_inner_blocks: false, };
195 inner_visitor.visit_expr(stmt_expr);
196 if inner_visitor.found_closure.is_some() {
197 self.found_closure = Some(stmt_expr.clone());
199 return;
200 }
201 }
202 }
203
204 visit::visit_expr(self, expr);
206
207 if self.found_closure.is_some() {
210 }
212 }
213 _ => {
214 visit::visit_expr(self, expr);
216 }
217 }
218 }
219}
220
221#[derive(Clone, PartialEq, Eq, Hash)]
225pub struct DebugType(pub Box<syn::Type>);
226
227impl From<syn::Type> for DebugType {
228 fn from(t: syn::Type) -> Self {
229 Self(Box::new(t))
230 }
231}
232
233impl Deref for DebugType {
234 type Target = syn::Type;
235
236 fn deref(&self) -> &Self::Target {
237 &self.0
238 }
239}
240
241impl ToTokens for DebugType {
242 fn to_tokens(&self, tokens: &mut TokenStream) {
243 self.0.to_tokens(tokens);
244 }
245}
246
247impl Debug for DebugType {
248 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
249 write!(f, "{}", self.0.to_token_stream())
250 }
251}
252
253pub enum DebugInstantiate {
254 Building,
255 Finalized(Box<DebugInstantiateFinalized>),
256}
257
258#[cfg_attr(
259 not(feature = "build"),
260 expect(
261 dead_code,
262 reason = "sink, source unused without `feature = \"build\"`."
263 )
264)]
265pub struct DebugInstantiateFinalized {
266 sink: syn::Expr,
267 source: syn::Expr,
268 connect_fn: Option<Box<dyn FnOnce()>>,
269}
270
271impl From<DebugInstantiateFinalized> for DebugInstantiate {
272 fn from(f: DebugInstantiateFinalized) -> Self {
273 Self::Finalized(Box::new(f))
274 }
275}
276
277impl Debug for DebugInstantiate {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 write!(f, "<network instantiate>")
280 }
281}
282
283impl Hash for DebugInstantiate {
284 fn hash<H: Hasher>(&self, _state: &mut H) {
285 }
287}
288
289impl Clone for DebugInstantiate {
290 fn clone(&self) -> Self {
291 match self {
292 DebugInstantiate::Building => DebugInstantiate::Building,
293 DebugInstantiate::Finalized(_) => {
294 panic!("DebugInstantiate::Finalized should not be cloned")
295 }
296 }
297 }
298}
299
300#[derive(Debug, Hash, Clone)]
302pub enum HydroSource {
303 Stream(DebugExpr),
304 ExternalNetwork(),
305 Iter(DebugExpr),
306 Spin(),
307}
308
309#[cfg(feature = "build")]
310pub trait DfirBuilder {
316 fn singleton_intermediates(&self) -> bool;
318
319 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
321
322 fn batch(
323 &mut self,
324 in_ident: syn::Ident,
325 in_location: &LocationId,
326 in_kind: &CollectionKind,
327 out_ident: &syn::Ident,
328 out_location: &LocationId,
329 op_meta: &HydroIrOpMetadata,
330 );
331 fn yield_from_tick(
332 &mut self,
333 in_ident: syn::Ident,
334 in_location: &LocationId,
335 in_kind: &CollectionKind,
336 out_ident: &syn::Ident,
337 out_location: &LocationId,
338 );
339
340 fn begin_atomic(
341 &mut self,
342 in_ident: syn::Ident,
343 in_location: &LocationId,
344 in_kind: &CollectionKind,
345 out_ident: &syn::Ident,
346 out_location: &LocationId,
347 op_meta: &HydroIrOpMetadata,
348 );
349 fn end_atomic(
350 &mut self,
351 in_ident: syn::Ident,
352 in_location: &LocationId,
353 in_kind: &CollectionKind,
354 out_ident: &syn::Ident,
355 );
356
357 fn observe_nondet(
358 &mut self,
359 trusted: bool,
360 location: &LocationId,
361 in_ident: syn::Ident,
362 in_kind: &CollectionKind,
363 out_ident: &syn::Ident,
364 out_kind: &CollectionKind,
365 );
366
367 #[expect(clippy::too_many_arguments, reason = "TODO")]
368 fn create_network(
369 &mut self,
370 from: &LocationId,
371 to: &LocationId,
372 input_ident: syn::Ident,
373 out_ident: &syn::Ident,
374 serialize: &Option<DebugExpr>,
375 sink: syn::Expr,
376 source: syn::Expr,
377 deserialize: &Option<DebugExpr>,
378 tag_id: usize,
379 );
380
381 fn create_external_source(
382 &mut self,
383 on: &LocationId,
384 source_expr: syn::Expr,
385 out_ident: &syn::Ident,
386 deserialize: &Option<DebugExpr>,
387 tag_id: usize,
388 );
389
390 fn create_external_output(
391 &mut self,
392 on: &LocationId,
393 sink_expr: syn::Expr,
394 input_ident: &syn::Ident,
395 serialize: &Option<DebugExpr>,
396 tag_id: usize,
397 );
398}
399
400#[cfg(feature = "build")]
401impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
402 fn singleton_intermediates(&self) -> bool {
403 false
404 }
405
406 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
407 self.entry(location.root().raw_id()).or_default()
408 }
409
410 fn batch(
411 &mut self,
412 in_ident: syn::Ident,
413 in_location: &LocationId,
414 _in_kind: &CollectionKind,
415 out_ident: &syn::Ident,
416 _out_location: &LocationId,
417 _op_meta: &HydroIrOpMetadata,
418 ) {
419 let builder = self.get_dfir_mut(in_location.root());
420 builder.add_dfir(
421 parse_quote! {
422 #out_ident = #in_ident;
423 },
424 None,
425 None,
426 );
427 }
428
429 fn yield_from_tick(
430 &mut self,
431 in_ident: syn::Ident,
432 in_location: &LocationId,
433 _in_kind: &CollectionKind,
434 out_ident: &syn::Ident,
435 _out_location: &LocationId,
436 ) {
437 let builder = self.get_dfir_mut(in_location.root());
438 builder.add_dfir(
439 parse_quote! {
440 #out_ident = #in_ident;
441 },
442 None,
443 None,
444 );
445 }
446
447 fn begin_atomic(
448 &mut self,
449 in_ident: syn::Ident,
450 in_location: &LocationId,
451 _in_kind: &CollectionKind,
452 out_ident: &syn::Ident,
453 _out_location: &LocationId,
454 _op_meta: &HydroIrOpMetadata,
455 ) {
456 let builder = self.get_dfir_mut(in_location.root());
457 builder.add_dfir(
458 parse_quote! {
459 #out_ident = #in_ident;
460 },
461 None,
462 None,
463 );
464 }
465
466 fn end_atomic(
467 &mut self,
468 in_ident: syn::Ident,
469 in_location: &LocationId,
470 _in_kind: &CollectionKind,
471 out_ident: &syn::Ident,
472 ) {
473 let builder = self.get_dfir_mut(in_location.root());
474 builder.add_dfir(
475 parse_quote! {
476 #out_ident = #in_ident;
477 },
478 None,
479 None,
480 );
481 }
482
483 fn observe_nondet(
484 &mut self,
485 _trusted: bool,
486 location: &LocationId,
487 in_ident: syn::Ident,
488 _in_kind: &CollectionKind,
489 out_ident: &syn::Ident,
490 _out_kind: &CollectionKind,
491 ) {
492 let builder = self.get_dfir_mut(location);
493 builder.add_dfir(
494 parse_quote! {
495 #out_ident = #in_ident;
496 },
497 None,
498 None,
499 );
500 }
501
502 fn create_network(
503 &mut self,
504 from: &LocationId,
505 to: &LocationId,
506 input_ident: syn::Ident,
507 out_ident: &syn::Ident,
508 serialize: &Option<DebugExpr>,
509 sink: syn::Expr,
510 source: syn::Expr,
511 deserialize: &Option<DebugExpr>,
512 tag_id: usize,
513 ) {
514 let sender_builder = self.get_dfir_mut(from);
515 if let Some(serialize_pipeline) = serialize {
516 sender_builder.add_dfir(
517 parse_quote! {
518 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
519 },
520 None,
521 Some(&format!("send{}", tag_id)),
523 );
524 } else {
525 sender_builder.add_dfir(
526 parse_quote! {
527 #input_ident -> dest_sink(#sink);
528 },
529 None,
530 Some(&format!("send{}", tag_id)),
531 );
532 }
533
534 let receiver_builder = self.get_dfir_mut(to);
535 if let Some(deserialize_pipeline) = deserialize {
536 receiver_builder.add_dfir(
537 parse_quote! {
538 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
539 },
540 None,
541 Some(&format!("recv{}", tag_id)),
542 );
543 } else {
544 receiver_builder.add_dfir(
545 parse_quote! {
546 #out_ident = source_stream(#source);
547 },
548 None,
549 Some(&format!("recv{}", tag_id)),
550 );
551 }
552 }
553
554 fn create_external_source(
555 &mut self,
556 on: &LocationId,
557 source_expr: syn::Expr,
558 out_ident: &syn::Ident,
559 deserialize: &Option<DebugExpr>,
560 tag_id: usize,
561 ) {
562 let receiver_builder = self.get_dfir_mut(on);
563 if let Some(deserialize_pipeline) = deserialize {
564 receiver_builder.add_dfir(
565 parse_quote! {
566 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
567 },
568 None,
569 Some(&format!("recv{}", tag_id)),
570 );
571 } else {
572 receiver_builder.add_dfir(
573 parse_quote! {
574 #out_ident = source_stream(#source_expr);
575 },
576 None,
577 Some(&format!("recv{}", tag_id)),
578 );
579 }
580 }
581
582 fn create_external_output(
583 &mut self,
584 on: &LocationId,
585 sink_expr: syn::Expr,
586 input_ident: &syn::Ident,
587 serialize: &Option<DebugExpr>,
588 tag_id: usize,
589 ) {
590 let sender_builder = self.get_dfir_mut(on);
591 if let Some(serialize_fn) = serialize {
592 sender_builder.add_dfir(
593 parse_quote! {
594 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
595 },
596 None,
597 Some(&format!("send{}", tag_id)),
599 );
600 } else {
601 sender_builder.add_dfir(
602 parse_quote! {
603 #input_ident -> dest_sink(#sink_expr);
604 },
605 None,
606 Some(&format!("send{}", tag_id)),
607 );
608 }
609 }
610}
611
612#[cfg(feature = "build")]
613pub enum BuildersOrCallback<'a, L, N>
614where
615 L: FnMut(&mut HydroRoot, &mut usize),
616 N: FnMut(&mut HydroNode, &mut usize),
617{
618 Builders(&'a mut dyn DfirBuilder),
619 Callback(L, N),
620}
621
622#[derive(Debug, Hash)]
626pub enum HydroRoot {
627 ForEach {
628 f: DebugExpr,
629 input: Box<HydroNode>,
630 op_metadata: HydroIrOpMetadata,
631 },
632 SendExternal {
633 to_external_id: usize,
634 to_key: usize,
635 to_many: bool,
636 unpaired: bool,
637 serialize_fn: Option<DebugExpr>,
638 instantiate_fn: DebugInstantiate,
639 input: Box<HydroNode>,
640 op_metadata: HydroIrOpMetadata,
641 },
642 DestSink {
643 sink: DebugExpr,
644 input: Box<HydroNode>,
645 op_metadata: HydroIrOpMetadata,
646 },
647 CycleSink {
648 ident: syn::Ident,
649 input: Box<HydroNode>,
650 op_metadata: HydroIrOpMetadata,
651 },
652}
653
654impl HydroRoot {
655 #[cfg(feature = "build")]
656 pub fn compile_network<'a, D>(
657 &mut self,
658 compile_env: &D::CompileEnv,
659 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
660 seen_tees: &mut SeenTees,
661 processes: &HashMap<usize, D::Process>,
662 clusters: &HashMap<usize, D::Cluster>,
663 externals: &HashMap<usize, D::External>,
664 ) where
665 D: Deploy<'a>,
666 {
667 let refcell_extra_stmts = RefCell::new(extra_stmts);
668 self.transform_bottom_up(
669 &mut |l| {
670 if let HydroRoot::SendExternal {
671 input,
672 to_external_id,
673 to_key,
674 to_many,
675 unpaired,
676 instantiate_fn,
677 ..
678 } = l
679 {
680 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
681 DebugInstantiate::Building => {
682 let to_node = externals
683 .get(to_external_id)
684 .unwrap_or_else(|| {
685 panic!("A external used in the graph was not instantiated: {}", to_external_id)
686 })
687 .clone();
688
689 match input.metadata().location_kind.root() {
690 LocationId::Process(process_id) => {
691 if *to_many {
692 (
693 (
694 D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_key)),
695 parse_quote!(DUMMY),
696 ),
697 Box::new(|| {}) as Box<dyn FnOnce()>,
698 )
699 } else {
700 let from_node = processes
701 .get(process_id)
702 .unwrap_or_else(|| {
703 panic!("A process used in the graph was not instantiated: {}", process_id)
704 })
705 .clone();
706
707 let sink_port = D::allocate_process_port(&from_node);
708 let source_port = D::allocate_external_port(&to_node);
709
710 if *unpaired {
711 use stageleft::quote_type;
712 use tokio_util::codec::LengthDelimitedCodec;
713
714 to_node.register(*to_key, source_port.clone());
715
716 let _ = D::e2o_source(
717 compile_env,
718 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
719 &to_node, &source_port,
720 &from_node, &sink_port,
721 "e_type::<LengthDelimitedCodec>(),
722 format!("{}_{}", *to_external_id, *to_key)
723 );
724 }
725
726 (
727 (
728 D::o2e_sink(
729 compile_env,
730 &from_node,
731 &sink_port,
732 &to_node,
733 &source_port,
734 format!("{}_{}", *to_external_id, *to_key)
735 ),
736 parse_quote!(DUMMY),
737 ),
738 if *unpaired {
739 D::e2o_connect(
740 &to_node,
741 &source_port,
742 &from_node,
743 &sink_port,
744 *to_many,
745 NetworkHint::Auto,
746 )
747 } else {
748 Box::new(|| {}) as Box<dyn FnOnce()>
749 },
750 )
751 }
752 }
753 LocationId::Cluster(_) => todo!(),
754 _ => panic!()
755 }
756 },
757
758 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
759 };
760
761 *instantiate_fn = DebugInstantiateFinalized {
762 sink: sink_expr,
763 source: source_expr,
764 connect_fn: Some(connect_fn),
765 }
766 .into();
767 }
768 },
769 &mut |n| {
770 if let HydroNode::Network {
771 input,
772 instantiate_fn,
773 metadata,
774 ..
775 } = n
776 {
777 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
778 DebugInstantiate::Building => instantiate_network::<D>(
779 input.metadata().location_kind.root(),
780 metadata.location_kind.root(),
781 processes,
782 clusters,
783 compile_env,
784 ),
785
786 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
787 };
788
789 *instantiate_fn = DebugInstantiateFinalized {
790 sink: sink_expr,
791 source: source_expr,
792 connect_fn: Some(connect_fn),
793 }
794 .into();
795 } else if let HydroNode::ExternalInput {
796 from_external_id,
797 from_key,
798 from_many,
799 codec_type,
800 port_hint,
801 instantiate_fn,
802 metadata,
803 ..
804 } = n
805 {
806 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
807 DebugInstantiate::Building => {
808 let from_node = externals
809 .get(from_external_id)
810 .unwrap_or_else(|| {
811 panic!(
812 "A external used in the graph was not instantiated: {}",
813 from_external_id
814 )
815 })
816 .clone();
817
818 match metadata.location_kind.root() {
819 LocationId::Process(process_id) => {
820 let to_node = processes
821 .get(process_id)
822 .unwrap_or_else(|| {
823 panic!("A process used in the graph was not instantiated: {}", process_id)
824 })
825 .clone();
826
827 let sink_port = D::allocate_external_port(&from_node);
828 let source_port = D::allocate_process_port(&to_node);
829
830 from_node.register(*from_key, sink_port.clone());
831
832 (
833 (
834 parse_quote!(DUMMY),
835 if *from_many {
836 D::e2o_many_source(
837 compile_env,
838 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
839 &to_node, &source_port,
840 codec_type.0.as_ref(),
841 format!("{}_{}", *from_external_id, *from_key)
842 )
843 } else {
844 D::e2o_source(
845 compile_env,
846 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
847 &from_node, &sink_port,
848 &to_node, &source_port,
849 codec_type.0.as_ref(),
850 format!("{}_{}", *from_external_id, *from_key)
851 )
852 },
853 ),
854 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
855 )
856 }
857 LocationId::Cluster(_) => todo!(),
858 _ => panic!()
859 }
860 },
861
862 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
863 };
864
865 *instantiate_fn = DebugInstantiateFinalized {
866 sink: sink_expr,
867 source: source_expr,
868 connect_fn: Some(connect_fn),
869 }
870 .into();
871 }
872 },
873 seen_tees,
874 false,
875 );
876 }
877
878 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
879 self.transform_bottom_up(
880 &mut |l| {
881 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
882 match instantiate_fn {
883 DebugInstantiate::Building => panic!("network not built"),
884
885 DebugInstantiate::Finalized(finalized) => {
886 (finalized.connect_fn.take().unwrap())();
887 }
888 }
889 }
890 },
891 &mut |n| {
892 if let HydroNode::Network { instantiate_fn, .. }
893 | HydroNode::ExternalInput { instantiate_fn, .. } = n
894 {
895 match instantiate_fn {
896 DebugInstantiate::Building => panic!("network not built"),
897
898 DebugInstantiate::Finalized(finalized) => {
899 (finalized.connect_fn.take().unwrap())();
900 }
901 }
902 }
903 },
904 seen_tees,
905 false,
906 );
907 }
908
909 pub fn transform_bottom_up(
910 &mut self,
911 transform_root: &mut impl FnMut(&mut HydroRoot),
912 transform_node: &mut impl FnMut(&mut HydroNode),
913 seen_tees: &mut SeenTees,
914 check_well_formed: bool,
915 ) {
916 self.transform_children(
917 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
918 seen_tees,
919 );
920
921 transform_root(self);
922 }
923
924 pub fn transform_children(
925 &mut self,
926 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
927 seen_tees: &mut SeenTees,
928 ) {
929 match self {
930 HydroRoot::ForEach { input, .. }
931 | HydroRoot::SendExternal { input, .. }
932 | HydroRoot::DestSink { input, .. }
933 | HydroRoot::CycleSink { input, .. } => {
934 transform(input, seen_tees);
935 }
936 }
937 }
938
939 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
940 match self {
941 HydroRoot::ForEach {
942 f,
943 input,
944 op_metadata,
945 } => HydroRoot::ForEach {
946 f: f.clone(),
947 input: Box::new(input.deep_clone(seen_tees)),
948 op_metadata: op_metadata.clone(),
949 },
950 HydroRoot::SendExternal {
951 to_external_id,
952 to_key,
953 to_many,
954 unpaired,
955 serialize_fn,
956 instantiate_fn,
957 input,
958 op_metadata,
959 } => HydroRoot::SendExternal {
960 to_external_id: *to_external_id,
961 to_key: *to_key,
962 to_many: *to_many,
963 unpaired: *unpaired,
964 serialize_fn: serialize_fn.clone(),
965 instantiate_fn: instantiate_fn.clone(),
966 input: Box::new(input.deep_clone(seen_tees)),
967 op_metadata: op_metadata.clone(),
968 },
969 HydroRoot::DestSink {
970 sink,
971 input,
972 op_metadata,
973 } => HydroRoot::DestSink {
974 sink: sink.clone(),
975 input: Box::new(input.deep_clone(seen_tees)),
976 op_metadata: op_metadata.clone(),
977 },
978 HydroRoot::CycleSink {
979 ident,
980 input,
981 op_metadata,
982 } => HydroRoot::CycleSink {
983 ident: ident.clone(),
984 input: Box::new(input.deep_clone(seen_tees)),
985 op_metadata: op_metadata.clone(),
986 },
987 }
988 }
989
990 #[cfg(feature = "build")]
991 pub fn emit(
992 &mut self,
993 graph_builders: &mut dyn DfirBuilder,
994 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
995 next_stmt_id: &mut usize,
996 ) {
997 self.emit_core(
998 &mut BuildersOrCallback::Builders::<
999 fn(&mut HydroRoot, &mut usize),
1000 fn(&mut HydroNode, &mut usize),
1001 >(graph_builders),
1002 built_tees,
1003 next_stmt_id,
1004 );
1005 }
1006
1007 #[cfg(feature = "build")]
1008 pub fn emit_core(
1009 &mut self,
1010 builders_or_callback: &mut BuildersOrCallback<
1011 impl FnMut(&mut HydroRoot, &mut usize),
1012 impl FnMut(&mut HydroNode, &mut usize),
1013 >,
1014 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1015 next_stmt_id: &mut usize,
1016 ) {
1017 match self {
1018 HydroRoot::ForEach { f, input, .. } => {
1019 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1020
1021 match builders_or_callback {
1022 BuildersOrCallback::Builders(graph_builders) => {
1023 graph_builders
1024 .get_dfir_mut(&input.metadata().location_kind)
1025 .add_dfir(
1026 parse_quote! {
1027 #input_ident -> for_each(#f);
1028 },
1029 None,
1030 Some(&next_stmt_id.to_string()),
1031 );
1032 }
1033 BuildersOrCallback::Callback(leaf_callback, _) => {
1034 leaf_callback(self, next_stmt_id);
1035 }
1036 }
1037
1038 *next_stmt_id += 1;
1039 }
1040
1041 HydroRoot::SendExternal {
1042 serialize_fn,
1043 instantiate_fn,
1044 input,
1045 ..
1046 } => {
1047 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1048
1049 match builders_or_callback {
1050 BuildersOrCallback::Builders(graph_builders) => {
1051 let (sink_expr, _) = match instantiate_fn {
1052 DebugInstantiate::Building => (
1053 syn::parse_quote!(DUMMY_SINK),
1054 syn::parse_quote!(DUMMY_SOURCE),
1055 ),
1056
1057 DebugInstantiate::Finalized(finalized) => {
1058 (finalized.sink.clone(), finalized.source.clone())
1059 }
1060 };
1061
1062 graph_builders.create_external_output(
1063 &input.metadata().location_kind,
1064 sink_expr,
1065 &input_ident,
1066 serialize_fn,
1067 *next_stmt_id,
1068 );
1069 }
1070 BuildersOrCallback::Callback(leaf_callback, _) => {
1071 leaf_callback(self, next_stmt_id);
1072 }
1073 }
1074
1075 *next_stmt_id += 1;
1076 }
1077
1078 HydroRoot::DestSink { sink, input, .. } => {
1079 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1080
1081 match builders_or_callback {
1082 BuildersOrCallback::Builders(graph_builders) => {
1083 graph_builders
1084 .get_dfir_mut(&input.metadata().location_kind)
1085 .add_dfir(
1086 parse_quote! {
1087 #input_ident -> dest_sink(#sink);
1088 },
1089 None,
1090 Some(&next_stmt_id.to_string()),
1091 );
1092 }
1093 BuildersOrCallback::Callback(leaf_callback, _) => {
1094 leaf_callback(self, next_stmt_id);
1095 }
1096 }
1097
1098 *next_stmt_id += 1;
1099 }
1100
1101 HydroRoot::CycleSink { ident, input, .. } => {
1102 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1103
1104 match builders_or_callback {
1105 BuildersOrCallback::Builders(graph_builders) => {
1106 graph_builders
1107 .get_dfir_mut(&input.metadata().location_kind)
1108 .add_dfir(
1109 parse_quote! {
1110 #ident = #input_ident;
1111 },
1112 None,
1113 None,
1114 );
1115 }
1116 BuildersOrCallback::Callback(_, _) => {}
1118 }
1119 }
1120 }
1121 }
1122
1123 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1124 match self {
1125 HydroRoot::ForEach { op_metadata, .. }
1126 | HydroRoot::SendExternal { op_metadata, .. }
1127 | HydroRoot::DestSink { op_metadata, .. }
1128 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1129 }
1130 }
1131
1132 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1133 match self {
1134 HydroRoot::ForEach { op_metadata, .. }
1135 | HydroRoot::SendExternal { op_metadata, .. }
1136 | HydroRoot::DestSink { op_metadata, .. }
1137 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1138 }
1139 }
1140
1141 pub fn input(&self) -> &HydroNode {
1142 match self {
1143 HydroRoot::ForEach { input, .. }
1144 | HydroRoot::SendExternal { input, .. }
1145 | HydroRoot::DestSink { input, .. }
1146 | HydroRoot::CycleSink { input, .. } => input,
1147 }
1148 }
1149
1150 pub fn input_metadata(&self) -> &HydroIrMetadata {
1151 self.input().metadata()
1152 }
1153
1154 pub fn print_root(&self) -> String {
1155 match self {
1156 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1157 HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1158 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1159 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1160 }
1161 }
1162
1163 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1164 match self {
1165 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1166 transform(f);
1167 }
1168 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1169 }
1170 }
1171}
1172
1173#[cfg(feature = "build")]
1174pub fn emit(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1175 let mut builders = BTreeMap::new();
1176 let mut built_tees = HashMap::new();
1177 let mut next_stmt_id = 0;
1178 for leaf in ir {
1179 leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
1180 }
1181 builders
1182}
1183
1184#[cfg(feature = "build")]
1185pub fn traverse_dfir(
1186 ir: &mut [HydroRoot],
1187 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1188 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1189) {
1190 let mut seen_tees = HashMap::new();
1191 let mut next_stmt_id = 0;
1192 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1193 ir.iter_mut().for_each(|leaf| {
1194 leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
1195 });
1196}
1197
1198pub fn transform_bottom_up(
1199 ir: &mut [HydroRoot],
1200 transform_root: &mut impl FnMut(&mut HydroRoot),
1201 transform_node: &mut impl FnMut(&mut HydroNode),
1202 check_well_formed: bool,
1203) {
1204 let mut seen_tees = HashMap::new();
1205 ir.iter_mut().for_each(|leaf| {
1206 leaf.transform_bottom_up(
1207 transform_root,
1208 transform_node,
1209 &mut seen_tees,
1210 check_well_formed,
1211 );
1212 });
1213}
1214
1215pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1216 let mut seen_tees = HashMap::new();
1217 ir.iter()
1218 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1219 .collect()
1220}
1221
1222type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1223thread_local! {
1224 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1225}
1226
1227pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1228 PRINTED_TEES.with(|printed_tees| {
1229 let mut printed_tees_mut = printed_tees.borrow_mut();
1230 *printed_tees_mut = Some((0, HashMap::new()));
1231 drop(printed_tees_mut);
1232
1233 let ret = f();
1234
1235 let mut printed_tees_mut = printed_tees.borrow_mut();
1236 *printed_tees_mut = None;
1237
1238 ret
1239 })
1240}
1241
1242pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1243
1244impl TeeNode {
1245 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1246 Rc::as_ptr(&self.0)
1247 }
1248}
1249
1250impl Debug for TeeNode {
1251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1252 PRINTED_TEES.with(|printed_tees| {
1253 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1254 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1255
1256 if let Some(printed_tees_mut) = printed_tees_mut {
1257 if let Some(existing) = printed_tees_mut
1258 .1
1259 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1260 {
1261 write!(f, "<tee {}>", existing)
1262 } else {
1263 let next_id = printed_tees_mut.0;
1264 printed_tees_mut.0 += 1;
1265 printed_tees_mut
1266 .1
1267 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1268 drop(printed_tees_mut_borrow);
1269 write!(f, "<tee {}>: ", next_id)?;
1270 Debug::fmt(&self.0.borrow(), f)
1271 }
1272 } else {
1273 drop(printed_tees_mut_borrow);
1274 write!(f, "<tee>: ")?;
1275 Debug::fmt(&self.0.borrow(), f)
1276 }
1277 })
1278 }
1279}
1280
1281impl Hash for TeeNode {
1282 fn hash<H: Hasher>(&self, state: &mut H) {
1283 self.0.borrow_mut().hash(state);
1284 }
1285}
1286
1287#[derive(Clone, PartialEq, Eq, Debug)]
1288pub enum BoundKind {
1289 Unbounded,
1290 Bounded,
1291}
1292
1293#[derive(Clone, PartialEq, Eq, Debug)]
1294pub enum StreamOrder {
1295 NoOrder,
1296 TotalOrder,
1297}
1298
1299#[derive(Clone, PartialEq, Eq, Debug)]
1300pub enum StreamRetry {
1301 AtLeastOnce,
1302 ExactlyOnce,
1303}
1304
1305#[derive(Clone, PartialEq, Eq, Debug)]
1306pub enum KeyedSingletonBoundKind {
1307 Unbounded,
1308 BoundedValue,
1309 Bounded,
1310}
1311
1312#[derive(Clone, PartialEq, Eq, Debug)]
1313pub enum CollectionKind {
1314 Stream {
1315 bound: BoundKind,
1316 order: StreamOrder,
1317 retry: StreamRetry,
1318 element_type: DebugType,
1319 },
1320 Singleton {
1321 bound: BoundKind,
1322 element_type: DebugType,
1323 },
1324 Optional {
1325 bound: BoundKind,
1326 element_type: DebugType,
1327 },
1328 KeyedStream {
1329 bound: BoundKind,
1330 value_order: StreamOrder,
1331 value_retry: StreamRetry,
1332 key_type: DebugType,
1333 value_type: DebugType,
1334 },
1335 KeyedSingleton {
1336 bound: KeyedSingletonBoundKind,
1337 key_type: DebugType,
1338 value_type: DebugType,
1339 },
1340}
1341
1342#[derive(Clone)]
1343pub struct HydroIrMetadata {
1344 pub location_kind: LocationId,
1345 pub collection_kind: CollectionKind,
1346 pub cardinality: Option<usize>,
1347 pub tag: Option<String>,
1348 pub op: HydroIrOpMetadata,
1349}
1350
1351impl Hash for HydroIrMetadata {
1353 fn hash<H: Hasher>(&self, _: &mut H) {}
1354}
1355
1356impl PartialEq for HydroIrMetadata {
1357 fn eq(&self, _: &Self) -> bool {
1358 true
1359 }
1360}
1361
1362impl Eq for HydroIrMetadata {}
1363
1364impl Debug for HydroIrMetadata {
1365 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1366 f.debug_struct("HydroIrMetadata")
1367 .field("location_kind", &self.location_kind)
1368 .field("collection_kind", &self.collection_kind)
1369 .finish()
1370 }
1371}
1372
1373#[derive(Clone)]
1376pub struct HydroIrOpMetadata {
1377 pub backtrace: Backtrace,
1378 pub cpu_usage: Option<f64>,
1379 pub network_recv_cpu_usage: Option<f64>,
1380 pub id: Option<usize>,
1381}
1382
1383impl HydroIrOpMetadata {
1384 #[expect(
1385 clippy::new_without_default,
1386 reason = "explicit calls to new ensure correct backtrace bounds"
1387 )]
1388 pub fn new() -> HydroIrOpMetadata {
1389 Self::new_with_skip(1)
1390 }
1391
1392 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1393 HydroIrOpMetadata {
1394 backtrace: Backtrace::get_backtrace(2 + skip_count),
1395 cpu_usage: None,
1396 network_recv_cpu_usage: None,
1397 id: None,
1398 }
1399 }
1400}
1401
1402impl Debug for HydroIrOpMetadata {
1403 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1404 f.debug_struct("HydroIrOpMetadata").finish()
1405 }
1406}
1407
1408impl Hash for HydroIrOpMetadata {
1409 fn hash<H: Hasher>(&self, _: &mut H) {}
1410}
1411
1412#[derive(Debug, Hash)]
1415pub enum HydroNode {
1416 Placeholder,
1417
1418 Cast {
1426 inner: Box<HydroNode>,
1427 metadata: HydroIrMetadata,
1428 },
1429
1430 ObserveNonDet {
1436 inner: Box<HydroNode>,
1437 trusted: bool, metadata: HydroIrMetadata,
1439 },
1440
1441 Source {
1442 source: HydroSource,
1443 metadata: HydroIrMetadata,
1444 },
1445
1446 SingletonSource {
1447 value: DebugExpr,
1448 metadata: HydroIrMetadata,
1449 },
1450
1451 CycleSource {
1452 ident: syn::Ident,
1453 metadata: HydroIrMetadata,
1454 },
1455
1456 Tee {
1457 inner: TeeNode,
1458 metadata: HydroIrMetadata,
1459 },
1460
1461 Persist {
1462 inner: Box<HydroNode>,
1463 metadata: HydroIrMetadata,
1464 },
1465
1466 BeginAtomic {
1467 inner: Box<HydroNode>,
1468 metadata: HydroIrMetadata,
1469 },
1470
1471 EndAtomic {
1472 inner: Box<HydroNode>,
1473 metadata: HydroIrMetadata,
1474 },
1475
1476 Batch {
1477 inner: Box<HydroNode>,
1478 metadata: HydroIrMetadata,
1479 },
1480
1481 YieldConcat {
1482 inner: Box<HydroNode>,
1483 metadata: HydroIrMetadata,
1484 },
1485
1486 Chain {
1487 first: Box<HydroNode>,
1488 second: Box<HydroNode>,
1489 metadata: HydroIrMetadata,
1490 },
1491
1492 ChainFirst {
1493 first: Box<HydroNode>,
1494 second: Box<HydroNode>,
1495 metadata: HydroIrMetadata,
1496 },
1497
1498 CrossProduct {
1499 left: Box<HydroNode>,
1500 right: Box<HydroNode>,
1501 metadata: HydroIrMetadata,
1502 },
1503
1504 CrossSingleton {
1505 left: Box<HydroNode>,
1506 right: Box<HydroNode>,
1507 metadata: HydroIrMetadata,
1508 },
1509
1510 Join {
1511 left: Box<HydroNode>,
1512 right: Box<HydroNode>,
1513 metadata: HydroIrMetadata,
1514 },
1515
1516 Difference {
1517 pos: Box<HydroNode>,
1518 neg: Box<HydroNode>,
1519 metadata: HydroIrMetadata,
1520 },
1521
1522 AntiJoin {
1523 pos: Box<HydroNode>,
1524 neg: Box<HydroNode>,
1525 metadata: HydroIrMetadata,
1526 },
1527
1528 ResolveFutures {
1529 input: Box<HydroNode>,
1530 metadata: HydroIrMetadata,
1531 },
1532 ResolveFuturesOrdered {
1533 input: Box<HydroNode>,
1534 metadata: HydroIrMetadata,
1535 },
1536
1537 Map {
1538 f: DebugExpr,
1539 input: Box<HydroNode>,
1540 metadata: HydroIrMetadata,
1541 },
1542 FlatMap {
1543 f: DebugExpr,
1544 input: Box<HydroNode>,
1545 metadata: HydroIrMetadata,
1546 },
1547 Filter {
1548 f: DebugExpr,
1549 input: Box<HydroNode>,
1550 metadata: HydroIrMetadata,
1551 },
1552 FilterMap {
1553 f: DebugExpr,
1554 input: Box<HydroNode>,
1555 metadata: HydroIrMetadata,
1556 },
1557
1558 DeferTick {
1559 input: Box<HydroNode>,
1560 metadata: HydroIrMetadata,
1561 },
1562 Enumerate {
1563 input: Box<HydroNode>,
1564 metadata: HydroIrMetadata,
1565 },
1566 Inspect {
1567 f: DebugExpr,
1568 input: Box<HydroNode>,
1569 metadata: HydroIrMetadata,
1570 },
1571
1572 Unique {
1573 input: Box<HydroNode>,
1574 metadata: HydroIrMetadata,
1575 },
1576
1577 Sort {
1578 input: Box<HydroNode>,
1579 metadata: HydroIrMetadata,
1580 },
1581 Fold {
1582 init: DebugExpr,
1583 acc: DebugExpr,
1584 input: Box<HydroNode>,
1585 metadata: HydroIrMetadata,
1586 },
1587
1588 Scan {
1589 init: DebugExpr,
1590 acc: DebugExpr,
1591 input: Box<HydroNode>,
1592 metadata: HydroIrMetadata,
1593 },
1594 FoldKeyed {
1595 init: DebugExpr,
1596 acc: DebugExpr,
1597 input: Box<HydroNode>,
1598 metadata: HydroIrMetadata,
1599 },
1600
1601 Reduce {
1602 f: DebugExpr,
1603 input: Box<HydroNode>,
1604 metadata: HydroIrMetadata,
1605 },
1606 ReduceKeyed {
1607 f: DebugExpr,
1608 input: Box<HydroNode>,
1609 metadata: HydroIrMetadata,
1610 },
1611 ReduceKeyedWatermark {
1612 f: DebugExpr,
1613 input: Box<HydroNode>,
1614 watermark: Box<HydroNode>,
1615 metadata: HydroIrMetadata,
1616 },
1617
1618 Network {
1619 serialize_fn: Option<DebugExpr>,
1620 instantiate_fn: DebugInstantiate,
1621 deserialize_fn: Option<DebugExpr>,
1622 input: Box<HydroNode>,
1623 metadata: HydroIrMetadata,
1624 },
1625
1626 ExternalInput {
1627 from_external_id: usize,
1628 from_key: usize,
1629 from_many: bool,
1630 codec_type: DebugType,
1631 port_hint: NetworkHint,
1632 instantiate_fn: DebugInstantiate,
1633 deserialize_fn: Option<DebugExpr>,
1634 metadata: HydroIrMetadata,
1635 },
1636
1637 Counter {
1638 tag: String,
1639 duration: DebugExpr,
1640 prefix: String,
1641 input: Box<HydroNode>,
1642 metadata: HydroIrMetadata,
1643 },
1644}
1645
1646pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1647pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1648
1649impl HydroNode {
1650 pub fn transform_bottom_up(
1651 &mut self,
1652 transform: &mut impl FnMut(&mut HydroNode),
1653 seen_tees: &mut SeenTees,
1654 check_well_formed: bool,
1655 ) {
1656 self.transform_children(
1657 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1658 seen_tees,
1659 );
1660
1661 transform(self);
1662
1663 let self_location = self.metadata().location_kind.root();
1664
1665 if check_well_formed {
1666 match &*self {
1667 HydroNode::Network { .. } => {}
1668 _ => {
1669 self.input_metadata().iter().for_each(|i| {
1670 if i.location_kind.root() != self_location {
1671 panic!(
1672 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1673 i,
1674 i.location_kind.root(),
1675 self,
1676 self_location
1677 )
1678 }
1679 });
1680 }
1681 }
1682 }
1683 }
1684
1685 #[inline(always)]
1686 pub fn transform_children(
1687 &mut self,
1688 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1689 seen_tees: &mut SeenTees,
1690 ) {
1691 match self {
1692 HydroNode::Placeholder => {
1693 panic!();
1694 }
1695
1696 HydroNode::Source { .. }
1697 | HydroNode::SingletonSource { .. }
1698 | HydroNode::CycleSource { .. }
1699 | HydroNode::ExternalInput { .. } => {}
1700
1701 HydroNode::Tee { inner, .. } => {
1702 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1703 *inner = TeeNode(transformed.clone());
1704 } else {
1705 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1706 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1707 let mut orig = inner.0.replace(HydroNode::Placeholder);
1708 transform(&mut orig, seen_tees);
1709 *transformed_cell.borrow_mut() = orig;
1710 *inner = TeeNode(transformed_cell);
1711 }
1712 }
1713
1714 HydroNode::Cast { inner, .. }
1715 | HydroNode::ObserveNonDet { inner, .. }
1716 | HydroNode::Persist { inner, .. }
1717 | HydroNode::BeginAtomic { inner, .. }
1718 | HydroNode::EndAtomic { inner, .. }
1719 | HydroNode::Batch { inner, .. }
1720 | HydroNode::YieldConcat { inner, .. } => {
1721 transform(inner.as_mut(), seen_tees);
1722 }
1723
1724 HydroNode::Chain { first, second, .. } => {
1725 transform(first.as_mut(), seen_tees);
1726 transform(second.as_mut(), seen_tees);
1727 }
1728
1729 HydroNode::ChainFirst { first, second, .. } => {
1730 transform(first.as_mut(), seen_tees);
1731 transform(second.as_mut(), seen_tees);
1732 }
1733
1734 HydroNode::CrossSingleton { left, right, .. }
1735 | HydroNode::CrossProduct { left, right, .. }
1736 | HydroNode::Join { left, right, .. } => {
1737 transform(left.as_mut(), seen_tees);
1738 transform(right.as_mut(), seen_tees);
1739 }
1740
1741 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1742 transform(pos.as_mut(), seen_tees);
1743 transform(neg.as_mut(), seen_tees);
1744 }
1745
1746 HydroNode::ReduceKeyedWatermark {
1747 input, watermark, ..
1748 } => {
1749 transform(input.as_mut(), seen_tees);
1750 transform(watermark.as_mut(), seen_tees);
1751 }
1752
1753 HydroNode::Map { input, .. }
1754 | HydroNode::ResolveFutures { input, .. }
1755 | HydroNode::ResolveFuturesOrdered { input, .. }
1756 | HydroNode::FlatMap { input, .. }
1757 | HydroNode::Filter { input, .. }
1758 | HydroNode::FilterMap { input, .. }
1759 | HydroNode::Sort { input, .. }
1760 | HydroNode::DeferTick { input, .. }
1761 | HydroNode::Enumerate { input, .. }
1762 | HydroNode::Inspect { input, .. }
1763 | HydroNode::Unique { input, .. }
1764 | HydroNode::Network { input, .. }
1765 | HydroNode::Fold { input, .. }
1766 | HydroNode::Scan { input, .. }
1767 | HydroNode::FoldKeyed { input, .. }
1768 | HydroNode::Reduce { input, .. }
1769 | HydroNode::ReduceKeyed { input, .. }
1770 | HydroNode::Counter { input, .. } => {
1771 transform(input.as_mut(), seen_tees);
1772 }
1773 }
1774 }
1775
1776 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1777 match self {
1778 HydroNode::Placeholder => HydroNode::Placeholder,
1779 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1780 inner: Box::new(inner.deep_clone(seen_tees)),
1781 metadata: metadata.clone(),
1782 },
1783 HydroNode::ObserveNonDet {
1784 inner,
1785 trusted,
1786 metadata,
1787 } => HydroNode::ObserveNonDet {
1788 inner: Box::new(inner.deep_clone(seen_tees)),
1789 trusted: *trusted,
1790 metadata: metadata.clone(),
1791 },
1792 HydroNode::Source { source, metadata } => HydroNode::Source {
1793 source: source.clone(),
1794 metadata: metadata.clone(),
1795 },
1796 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1797 value: value.clone(),
1798 metadata: metadata.clone(),
1799 },
1800 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1801 ident: ident.clone(),
1802 metadata: metadata.clone(),
1803 },
1804 HydroNode::Tee { inner, metadata } => {
1805 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1806 HydroNode::Tee {
1807 inner: TeeNode(transformed.clone()),
1808 metadata: metadata.clone(),
1809 }
1810 } else {
1811 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1812 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1813 let cloned = inner.0.borrow().deep_clone(seen_tees);
1814 *new_rc.borrow_mut() = cloned;
1815 HydroNode::Tee {
1816 inner: TeeNode(new_rc),
1817 metadata: metadata.clone(),
1818 }
1819 }
1820 }
1821 HydroNode::Persist { inner, metadata } => HydroNode::Persist {
1822 inner: Box::new(inner.deep_clone(seen_tees)),
1823 metadata: metadata.clone(),
1824 },
1825 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1826 inner: Box::new(inner.deep_clone(seen_tees)),
1827 metadata: metadata.clone(),
1828 },
1829 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1830 inner: Box::new(inner.deep_clone(seen_tees)),
1831 metadata: metadata.clone(),
1832 },
1833 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1834 inner: Box::new(inner.deep_clone(seen_tees)),
1835 metadata: metadata.clone(),
1836 },
1837 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1838 inner: Box::new(inner.deep_clone(seen_tees)),
1839 metadata: metadata.clone(),
1840 },
1841 HydroNode::Chain {
1842 first,
1843 second,
1844 metadata,
1845 } => HydroNode::Chain {
1846 first: Box::new(first.deep_clone(seen_tees)),
1847 second: Box::new(second.deep_clone(seen_tees)),
1848 metadata: metadata.clone(),
1849 },
1850 HydroNode::ChainFirst {
1851 first,
1852 second,
1853 metadata,
1854 } => HydroNode::ChainFirst {
1855 first: Box::new(first.deep_clone(seen_tees)),
1856 second: Box::new(second.deep_clone(seen_tees)),
1857 metadata: metadata.clone(),
1858 },
1859 HydroNode::CrossProduct {
1860 left,
1861 right,
1862 metadata,
1863 } => HydroNode::CrossProduct {
1864 left: Box::new(left.deep_clone(seen_tees)),
1865 right: Box::new(right.deep_clone(seen_tees)),
1866 metadata: metadata.clone(),
1867 },
1868 HydroNode::CrossSingleton {
1869 left,
1870 right,
1871 metadata,
1872 } => HydroNode::CrossSingleton {
1873 left: Box::new(left.deep_clone(seen_tees)),
1874 right: Box::new(right.deep_clone(seen_tees)),
1875 metadata: metadata.clone(),
1876 },
1877 HydroNode::Join {
1878 left,
1879 right,
1880 metadata,
1881 } => HydroNode::Join {
1882 left: Box::new(left.deep_clone(seen_tees)),
1883 right: Box::new(right.deep_clone(seen_tees)),
1884 metadata: metadata.clone(),
1885 },
1886 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1887 pos: Box::new(pos.deep_clone(seen_tees)),
1888 neg: Box::new(neg.deep_clone(seen_tees)),
1889 metadata: metadata.clone(),
1890 },
1891 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1892 pos: Box::new(pos.deep_clone(seen_tees)),
1893 neg: Box::new(neg.deep_clone(seen_tees)),
1894 metadata: metadata.clone(),
1895 },
1896 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1897 input: Box::new(input.deep_clone(seen_tees)),
1898 metadata: metadata.clone(),
1899 },
1900 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1901 HydroNode::ResolveFuturesOrdered {
1902 input: Box::new(input.deep_clone(seen_tees)),
1903 metadata: metadata.clone(),
1904 }
1905 }
1906 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1907 f: f.clone(),
1908 input: Box::new(input.deep_clone(seen_tees)),
1909 metadata: metadata.clone(),
1910 },
1911 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1912 f: f.clone(),
1913 input: Box::new(input.deep_clone(seen_tees)),
1914 metadata: metadata.clone(),
1915 },
1916 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1917 f: f.clone(),
1918 input: Box::new(input.deep_clone(seen_tees)),
1919 metadata: metadata.clone(),
1920 },
1921 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1922 f: f.clone(),
1923 input: Box::new(input.deep_clone(seen_tees)),
1924 metadata: metadata.clone(),
1925 },
1926 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1927 input: Box::new(input.deep_clone(seen_tees)),
1928 metadata: metadata.clone(),
1929 },
1930 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
1931 input: Box::new(input.deep_clone(seen_tees)),
1932 metadata: metadata.clone(),
1933 },
1934 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1935 f: f.clone(),
1936 input: Box::new(input.deep_clone(seen_tees)),
1937 metadata: metadata.clone(),
1938 },
1939 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1940 input: Box::new(input.deep_clone(seen_tees)),
1941 metadata: metadata.clone(),
1942 },
1943 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1944 input: Box::new(input.deep_clone(seen_tees)),
1945 metadata: metadata.clone(),
1946 },
1947 HydroNode::Fold {
1948 init,
1949 acc,
1950 input,
1951 metadata,
1952 } => HydroNode::Fold {
1953 init: init.clone(),
1954 acc: acc.clone(),
1955 input: Box::new(input.deep_clone(seen_tees)),
1956 metadata: metadata.clone(),
1957 },
1958 HydroNode::Scan {
1959 init,
1960 acc,
1961 input,
1962 metadata,
1963 } => HydroNode::Scan {
1964 init: init.clone(),
1965 acc: acc.clone(),
1966 input: Box::new(input.deep_clone(seen_tees)),
1967 metadata: metadata.clone(),
1968 },
1969 HydroNode::FoldKeyed {
1970 init,
1971 acc,
1972 input,
1973 metadata,
1974 } => HydroNode::FoldKeyed {
1975 init: init.clone(),
1976 acc: acc.clone(),
1977 input: Box::new(input.deep_clone(seen_tees)),
1978 metadata: metadata.clone(),
1979 },
1980 HydroNode::ReduceKeyedWatermark {
1981 f,
1982 input,
1983 watermark,
1984 metadata,
1985 } => HydroNode::ReduceKeyedWatermark {
1986 f: f.clone(),
1987 input: Box::new(input.deep_clone(seen_tees)),
1988 watermark: Box::new(watermark.deep_clone(seen_tees)),
1989 metadata: metadata.clone(),
1990 },
1991 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1992 f: f.clone(),
1993 input: Box::new(input.deep_clone(seen_tees)),
1994 metadata: metadata.clone(),
1995 },
1996 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1997 f: f.clone(),
1998 input: Box::new(input.deep_clone(seen_tees)),
1999 metadata: metadata.clone(),
2000 },
2001 HydroNode::Network {
2002 serialize_fn,
2003 instantiate_fn,
2004 deserialize_fn,
2005 input,
2006 metadata,
2007 } => HydroNode::Network {
2008 serialize_fn: serialize_fn.clone(),
2009 instantiate_fn: instantiate_fn.clone(),
2010 deserialize_fn: deserialize_fn.clone(),
2011 input: Box::new(input.deep_clone(seen_tees)),
2012 metadata: metadata.clone(),
2013 },
2014 HydroNode::ExternalInput {
2015 from_external_id,
2016 from_key,
2017 from_many,
2018 codec_type,
2019 port_hint,
2020 instantiate_fn,
2021 deserialize_fn,
2022 metadata,
2023 } => HydroNode::ExternalInput {
2024 from_external_id: *from_external_id,
2025 from_key: *from_key,
2026 from_many: *from_many,
2027 codec_type: codec_type.clone(),
2028 port_hint: *port_hint,
2029 instantiate_fn: instantiate_fn.clone(),
2030 deserialize_fn: deserialize_fn.clone(),
2031 metadata: metadata.clone(),
2032 },
2033 HydroNode::Counter {
2034 tag,
2035 duration,
2036 prefix,
2037 input,
2038 metadata,
2039 } => HydroNode::Counter {
2040 tag: tag.clone(),
2041 duration: duration.clone(),
2042 prefix: prefix.clone(),
2043 input: Box::new(input.deep_clone(seen_tees)),
2044 metadata: metadata.clone(),
2045 },
2046 }
2047 }
2048
2049 #[cfg(feature = "build")]
2050 pub fn emit_core(
2051 &mut self,
2052 builders_or_callback: &mut BuildersOrCallback<
2053 impl FnMut(&mut HydroRoot, &mut usize),
2054 impl FnMut(&mut HydroNode, &mut usize),
2055 >,
2056 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2057 next_stmt_id: &mut usize,
2058 ) -> syn::Ident {
2059 let out_location = self.metadata().location_kind.clone();
2060 match self {
2061 HydroNode::Placeholder => {
2062 panic!()
2063 }
2064
2065 HydroNode::Cast { inner, .. } => {
2066 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2067
2068 match builders_or_callback {
2069 BuildersOrCallback::Builders(_) => {}
2070 BuildersOrCallback::Callback(_, node_callback) => {
2071 node_callback(self, next_stmt_id);
2072 }
2073 }
2074
2075 *next_stmt_id += 1;
2076
2077 inner_ident
2078 }
2079
2080 HydroNode::ObserveNonDet {
2081 inner,
2082 trusted,
2083 metadata,
2084 ..
2085 } => {
2086 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2087
2088 let observe_ident =
2089 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2090
2091 match builders_or_callback {
2092 BuildersOrCallback::Builders(graph_builders) => {
2093 graph_builders.observe_nondet(
2094 *trusted,
2095 &inner.metadata().location_kind,
2096 inner_ident,
2097 &inner.metadata().collection_kind,
2098 &observe_ident,
2099 &metadata.collection_kind,
2100 );
2101 }
2102 BuildersOrCallback::Callback(_, node_callback) => {
2103 node_callback(self, next_stmt_id);
2104 }
2105 }
2106
2107 *next_stmt_id += 1;
2108
2109 observe_ident
2110 }
2111
2112 HydroNode::Persist { inner, .. } => {
2113 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2114
2115 let persist_ident =
2116 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2117
2118 match builders_or_callback {
2119 BuildersOrCallback::Builders(graph_builders) => {
2120 let builder = graph_builders.get_dfir_mut(&out_location);
2121 builder.add_dfir(
2122 parse_quote! {
2123 #persist_ident = #inner_ident -> persist::<'static>();
2124 },
2125 None,
2126 Some(&next_stmt_id.to_string()),
2127 );
2128 }
2129 BuildersOrCallback::Callback(_, node_callback) => {
2130 node_callback(self, next_stmt_id);
2131 }
2132 }
2133
2134 *next_stmt_id += 1;
2135
2136 persist_ident
2137 }
2138
2139 HydroNode::Batch {
2140 inner, metadata, ..
2141 } => {
2142 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2143
2144 let batch_ident =
2145 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2146
2147 match builders_or_callback {
2148 BuildersOrCallback::Builders(graph_builders) => {
2149 graph_builders.batch(
2150 inner_ident,
2151 &inner.metadata().location_kind,
2152 &inner.metadata().collection_kind,
2153 &batch_ident,
2154 &out_location,
2155 &metadata.op,
2156 );
2157 }
2158 BuildersOrCallback::Callback(_, node_callback) => {
2159 node_callback(self, next_stmt_id);
2160 }
2161 }
2162
2163 *next_stmt_id += 1;
2164
2165 batch_ident
2166 }
2167
2168 HydroNode::YieldConcat { inner, .. } => {
2169 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2170
2171 let yield_ident =
2172 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2173
2174 match builders_or_callback {
2175 BuildersOrCallback::Builders(graph_builders) => {
2176 graph_builders.yield_from_tick(
2177 inner_ident,
2178 &inner.metadata().location_kind,
2179 &inner.metadata().collection_kind,
2180 &yield_ident,
2181 &out_location,
2182 );
2183 }
2184 BuildersOrCallback::Callback(_, node_callback) => {
2185 node_callback(self, next_stmt_id);
2186 }
2187 }
2188
2189 *next_stmt_id += 1;
2190
2191 yield_ident
2192 }
2193
2194 HydroNode::BeginAtomic { inner, metadata } => {
2195 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2196
2197 let begin_ident =
2198 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2199
2200 match builders_or_callback {
2201 BuildersOrCallback::Builders(graph_builders) => {
2202 graph_builders.begin_atomic(
2203 inner_ident,
2204 &inner.metadata().location_kind,
2205 &inner.metadata().collection_kind,
2206 &begin_ident,
2207 &out_location,
2208 &metadata.op,
2209 );
2210 }
2211 BuildersOrCallback::Callback(_, node_callback) => {
2212 node_callback(self, next_stmt_id);
2213 }
2214 }
2215
2216 *next_stmt_id += 1;
2217
2218 begin_ident
2219 }
2220
2221 HydroNode::EndAtomic { inner, .. } => {
2222 let inner_ident = inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2223
2224 let end_ident =
2225 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2226
2227 match builders_or_callback {
2228 BuildersOrCallback::Builders(graph_builders) => {
2229 graph_builders.end_atomic(
2230 inner_ident,
2231 &inner.metadata().location_kind,
2232 &inner.metadata().collection_kind,
2233 &end_ident,
2234 );
2235 }
2236 BuildersOrCallback::Callback(_, node_callback) => {
2237 node_callback(self, next_stmt_id);
2238 }
2239 }
2240
2241 *next_stmt_id += 1;
2242
2243 end_ident
2244 }
2245
2246 HydroNode::Source {
2247 source, metadata, ..
2248 } => {
2249 if let HydroSource::ExternalNetwork() = source {
2250 syn::Ident::new("DUMMY", Span::call_site())
2251 } else {
2252 let source_ident =
2253 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2254
2255 let source_stmt = match source {
2256 HydroSource::Stream(expr) => {
2257 debug_assert!(metadata.location_kind.is_top_level());
2258 parse_quote! {
2259 #source_ident = source_stream(#expr);
2260 }
2261 }
2262
2263 HydroSource::ExternalNetwork() => {
2264 unreachable!()
2265 }
2266
2267 HydroSource::Iter(expr) => {
2268 if metadata.location_kind.is_top_level() {
2269 parse_quote! {
2270 #source_ident = source_iter(#expr);
2271 }
2272 } else {
2273 parse_quote! {
2275 #source_ident = source_iter(#expr) -> persist::<'static>();
2276 }
2277 }
2278 }
2279
2280 HydroSource::Spin() => {
2281 debug_assert!(metadata.location_kind.is_top_level());
2282 parse_quote! {
2283 #source_ident = spin();
2284 }
2285 }
2286 };
2287
2288 match builders_or_callback {
2289 BuildersOrCallback::Builders(graph_builders) => {
2290 let builder = graph_builders.get_dfir_mut(&out_location);
2291 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2292 }
2293 BuildersOrCallback::Callback(_, node_callback) => {
2294 node_callback(self, next_stmt_id);
2295 }
2296 }
2297
2298 *next_stmt_id += 1;
2299
2300 source_ident
2301 }
2302 }
2303
2304 HydroNode::SingletonSource { value, metadata } => {
2305 let source_ident =
2306 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2307
2308 match builders_or_callback {
2309 BuildersOrCallback::Builders(graph_builders) => {
2310 let should_replay = !graph_builders.singleton_intermediates();
2311 let builder = graph_builders.get_dfir_mut(&out_location);
2312
2313 if should_replay || !metadata.location_kind.is_top_level() {
2314 builder.add_dfir(
2315 parse_quote! {
2316 #source_ident = source_iter([#value]) -> persist::<'static>();
2317 },
2318 None,
2319 Some(&next_stmt_id.to_string()),
2320 );
2321 } else {
2322 builder.add_dfir(
2323 parse_quote! {
2324 #source_ident = source_iter([#value]);
2325 },
2326 None,
2327 Some(&next_stmt_id.to_string()),
2328 );
2329 }
2330 }
2331 BuildersOrCallback::Callback(_, node_callback) => {
2332 node_callback(self, next_stmt_id);
2333 }
2334 }
2335
2336 *next_stmt_id += 1;
2337
2338 source_ident
2339 }
2340
2341 HydroNode::CycleSource { ident, .. } => {
2342 let ident = ident.clone();
2343
2344 match builders_or_callback {
2345 BuildersOrCallback::Builders(_) => {}
2346 BuildersOrCallback::Callback(_, node_callback) => {
2347 node_callback(self, next_stmt_id);
2348 }
2349 }
2350
2351 *next_stmt_id += 1;
2353
2354 ident
2355 }
2356
2357 HydroNode::Tee { inner, .. } => {
2358 let ret_ident = if let Some(teed_from) =
2359 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2360 {
2361 match builders_or_callback {
2362 BuildersOrCallback::Builders(_) => {}
2363 BuildersOrCallback::Callback(_, node_callback) => {
2364 node_callback(self, next_stmt_id);
2365 }
2366 }
2367
2368 teed_from.clone()
2369 } else {
2370 let inner_ident = inner.0.borrow_mut().emit_core(
2371 builders_or_callback,
2372 built_tees,
2373 next_stmt_id,
2374 );
2375
2376 let tee_ident =
2377 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2378
2379 built_tees.insert(
2380 inner.0.as_ref() as *const RefCell<HydroNode>,
2381 tee_ident.clone(),
2382 );
2383
2384 match builders_or_callback {
2385 BuildersOrCallback::Builders(graph_builders) => {
2386 let builder = graph_builders.get_dfir_mut(&out_location);
2387 builder.add_dfir(
2388 parse_quote! {
2389 #tee_ident = #inner_ident -> tee();
2390 },
2391 None,
2392 Some(&next_stmt_id.to_string()),
2393 );
2394 }
2395 BuildersOrCallback::Callback(_, node_callback) => {
2396 node_callback(self, next_stmt_id);
2397 }
2398 }
2399
2400 tee_ident
2401 };
2402
2403 *next_stmt_id += 1;
2407 ret_ident
2408 }
2409
2410 HydroNode::Chain { first, second, .. } => {
2411 let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2412 let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2413
2414 let chain_ident =
2415 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2416
2417 match builders_or_callback {
2418 BuildersOrCallback::Builders(graph_builders) => {
2419 let builder = graph_builders.get_dfir_mut(&out_location);
2420 builder.add_dfir(
2421 parse_quote! {
2422 #chain_ident = chain();
2423 #first_ident -> [0]#chain_ident;
2424 #second_ident -> [1]#chain_ident;
2425 },
2426 None,
2427 Some(&next_stmt_id.to_string()),
2428 );
2429 }
2430 BuildersOrCallback::Callback(_, node_callback) => {
2431 node_callback(self, next_stmt_id);
2432 }
2433 }
2434
2435 *next_stmt_id += 1;
2436
2437 chain_ident
2438 }
2439
2440 HydroNode::ChainFirst { first, second, .. } => {
2441 let first_ident = first.emit_core(builders_or_callback, built_tees, next_stmt_id);
2442 let second_ident = second.emit_core(builders_or_callback, built_tees, next_stmt_id);
2443
2444 let chain_ident =
2445 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2446
2447 match builders_or_callback {
2448 BuildersOrCallback::Builders(graph_builders) => {
2449 let builder = graph_builders.get_dfir_mut(&out_location);
2450 builder.add_dfir(
2451 parse_quote! {
2452 #chain_ident = chain_first_n(1);
2453 #first_ident -> [0]#chain_ident;
2454 #second_ident -> [1]#chain_ident;
2455 },
2456 None,
2457 Some(&next_stmt_id.to_string()),
2458 );
2459 }
2460 BuildersOrCallback::Callback(_, node_callback) => {
2461 node_callback(self, next_stmt_id);
2462 }
2463 }
2464
2465 *next_stmt_id += 1;
2466
2467 chain_ident
2468 }
2469
2470 HydroNode::CrossSingleton { left, right, .. } => {
2471 let left_ident = left.emit_core(builders_or_callback, built_tees, next_stmt_id);
2472 let right_ident = right.emit_core(builders_or_callback, built_tees, next_stmt_id);
2473
2474 let cross_ident =
2475 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2476
2477 match builders_or_callback {
2478 BuildersOrCallback::Builders(graph_builders) => {
2479 let builder = graph_builders.get_dfir_mut(&out_location);
2480 builder.add_dfir(
2481 parse_quote! {
2482 #cross_ident = cross_singleton();
2483 #left_ident -> [input]#cross_ident;
2484 #right_ident -> [single]#cross_ident;
2485 },
2486 None,
2487 Some(&next_stmt_id.to_string()),
2488 );
2489 }
2490 BuildersOrCallback::Callback(_, node_callback) => {
2491 node_callback(self, next_stmt_id);
2492 }
2493 }
2494
2495 *next_stmt_id += 1;
2496
2497 cross_ident
2498 }
2499
2500 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2501 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
2502 parse_quote!(cross_join_multiset)
2503 } else {
2504 parse_quote!(join_multiset)
2505 };
2506
2507 let (HydroNode::CrossProduct { left, right, .. }
2508 | HydroNode::Join { left, right, .. }) = self
2509 else {
2510 unreachable!()
2511 };
2512
2513 let is_top_level = left.metadata().location_kind.is_top_level()
2514 && right.metadata().location_kind.is_top_level();
2515 let (left_inner, left_lifetime) =
2516 if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
2517 debug_assert!(!left.metadata().location_kind.is_top_level());
2518 (left, quote!('static))
2519 } else if left.metadata().location_kind.is_top_level() {
2520 (left, quote!('static))
2521 } else {
2522 (left, quote!('tick))
2523 };
2524
2525 let (right_inner, right_lifetime) =
2526 if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
2527 debug_assert!(!right.metadata().location_kind.is_top_level());
2528 (right, quote!('static))
2529 } else if right.metadata().location_kind.is_top_level() {
2530 (right, quote!('static))
2531 } else {
2532 (right, quote!('tick))
2533 };
2534
2535 let left_ident =
2536 left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2537 let right_ident =
2538 right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
2539
2540 let stream_ident =
2541 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2542
2543 match builders_or_callback {
2544 BuildersOrCallback::Builders(graph_builders) => {
2545 let builder = graph_builders.get_dfir_mut(&out_location);
2546 builder.add_dfir(
2547 if is_top_level {
2548 parse_quote! {
2551 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2552 #left_ident -> [0]#stream_ident;
2553 #right_ident -> [1]#stream_ident;
2554 }
2555 } else {
2556 parse_quote! {
2557 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2558 #left_ident -> [0]#stream_ident;
2559 #right_ident -> [1]#stream_ident;
2560 }
2561 }
2562 ,
2563 None,
2564 Some(&next_stmt_id.to_string()),
2565 );
2566 }
2567 BuildersOrCallback::Callback(_, node_callback) => {
2568 node_callback(self, next_stmt_id);
2569 }
2570 }
2571
2572 *next_stmt_id += 1;
2573
2574 stream_ident
2575 }
2576
2577 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2578 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
2579 parse_quote!(difference_multiset)
2580 } else {
2581 parse_quote!(anti_join_multiset)
2582 };
2583
2584 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
2585 self
2586 else {
2587 unreachable!()
2588 };
2589
2590 let (neg, neg_lifetime) =
2591 if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
2592 debug_assert!(!neg.metadata().location_kind.is_top_level());
2593 (neg, quote!('static))
2594 } else if neg.metadata().location_kind.is_top_level() {
2595 (neg, quote!('static))
2596 } else {
2597 (neg, quote!('tick))
2598 };
2599
2600 let pos_ident = pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
2601 let neg_ident = neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
2602
2603 let stream_ident =
2604 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2605
2606 match builders_or_callback {
2607 BuildersOrCallback::Builders(graph_builders) => {
2608 let builder = graph_builders.get_dfir_mut(&out_location);
2609 builder.add_dfir(
2610 parse_quote! {
2611 #stream_ident = #operator::<'tick, #neg_lifetime>();
2612 #pos_ident -> [pos]#stream_ident;
2613 #neg_ident -> [neg]#stream_ident;
2614 },
2615 None,
2616 Some(&next_stmt_id.to_string()),
2617 );
2618 }
2619 BuildersOrCallback::Callback(_, node_callback) => {
2620 node_callback(self, next_stmt_id);
2621 }
2622 }
2623
2624 *next_stmt_id += 1;
2625
2626 stream_ident
2627 }
2628
2629 HydroNode::ResolveFutures { input, .. } => {
2630 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2631
2632 let futures_ident =
2633 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2634
2635 match builders_or_callback {
2636 BuildersOrCallback::Builders(graph_builders) => {
2637 let builder = graph_builders.get_dfir_mut(&out_location);
2638 builder.add_dfir(
2639 parse_quote! {
2640 #futures_ident = #input_ident -> resolve_futures();
2641 },
2642 None,
2643 Some(&next_stmt_id.to_string()),
2644 );
2645 }
2646 BuildersOrCallback::Callback(_, node_callback) => {
2647 node_callback(self, next_stmt_id);
2648 }
2649 }
2650
2651 *next_stmt_id += 1;
2652
2653 futures_ident
2654 }
2655
2656 HydroNode::ResolveFuturesOrdered { input, .. } => {
2657 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2658
2659 let futures_ident =
2660 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2661
2662 match builders_or_callback {
2663 BuildersOrCallback::Builders(graph_builders) => {
2664 let builder = graph_builders.get_dfir_mut(&out_location);
2665 builder.add_dfir(
2666 parse_quote! {
2667 #futures_ident = #input_ident -> resolve_futures_ordered();
2668 },
2669 None,
2670 Some(&next_stmt_id.to_string()),
2671 );
2672 }
2673 BuildersOrCallback::Callback(_, node_callback) => {
2674 node_callback(self, next_stmt_id);
2675 }
2676 }
2677
2678 *next_stmt_id += 1;
2679
2680 futures_ident
2681 }
2682
2683 HydroNode::Map { f, input, .. } => {
2684 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2685
2686 let map_ident =
2687 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2688
2689 match builders_or_callback {
2690 BuildersOrCallback::Builders(graph_builders) => {
2691 let builder = graph_builders.get_dfir_mut(&out_location);
2692 builder.add_dfir(
2693 parse_quote! {
2694 #map_ident = #input_ident -> map(#f);
2695 },
2696 None,
2697 Some(&next_stmt_id.to_string()),
2698 );
2699 }
2700 BuildersOrCallback::Callback(_, node_callback) => {
2701 node_callback(self, next_stmt_id);
2702 }
2703 }
2704
2705 *next_stmt_id += 1;
2706
2707 map_ident
2708 }
2709
2710 HydroNode::FlatMap { f, input, .. } => {
2711 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2712
2713 let flat_map_ident =
2714 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2715
2716 match builders_or_callback {
2717 BuildersOrCallback::Builders(graph_builders) => {
2718 let builder = graph_builders.get_dfir_mut(&out_location);
2719 builder.add_dfir(
2720 parse_quote! {
2721 #flat_map_ident = #input_ident -> flat_map(#f);
2722 },
2723 None,
2724 Some(&next_stmt_id.to_string()),
2725 );
2726 }
2727 BuildersOrCallback::Callback(_, node_callback) => {
2728 node_callback(self, next_stmt_id);
2729 }
2730 }
2731
2732 *next_stmt_id += 1;
2733
2734 flat_map_ident
2735 }
2736
2737 HydroNode::Filter { f, input, .. } => {
2738 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2739
2740 let filter_ident =
2741 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2742
2743 match builders_or_callback {
2744 BuildersOrCallback::Builders(graph_builders) => {
2745 let builder = graph_builders.get_dfir_mut(&out_location);
2746 builder.add_dfir(
2747 parse_quote! {
2748 #filter_ident = #input_ident -> filter(#f);
2749 },
2750 None,
2751 Some(&next_stmt_id.to_string()),
2752 );
2753 }
2754 BuildersOrCallback::Callback(_, node_callback) => {
2755 node_callback(self, next_stmt_id);
2756 }
2757 }
2758
2759 *next_stmt_id += 1;
2760
2761 filter_ident
2762 }
2763
2764 HydroNode::FilterMap { f, input, .. } => {
2765 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2766
2767 let filter_map_ident =
2768 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2769
2770 match builders_or_callback {
2771 BuildersOrCallback::Builders(graph_builders) => {
2772 let builder = graph_builders.get_dfir_mut(&out_location);
2773 builder.add_dfir(
2774 parse_quote! {
2775 #filter_map_ident = #input_ident -> filter_map(#f);
2776 },
2777 None,
2778 Some(&next_stmt_id.to_string()),
2779 );
2780 }
2781 BuildersOrCallback::Callback(_, node_callback) => {
2782 node_callback(self, next_stmt_id);
2783 }
2784 }
2785
2786 *next_stmt_id += 1;
2787
2788 filter_map_ident
2789 }
2790
2791 HydroNode::Sort { input, .. } => {
2792 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2793
2794 let sort_ident =
2795 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2796
2797 match builders_or_callback {
2798 BuildersOrCallback::Builders(graph_builders) => {
2799 let builder = graph_builders.get_dfir_mut(&out_location);
2800 builder.add_dfir(
2801 parse_quote! {
2802 #sort_ident = #input_ident -> sort();
2803 },
2804 None,
2805 Some(&next_stmt_id.to_string()),
2806 );
2807 }
2808 BuildersOrCallback::Callback(_, node_callback) => {
2809 node_callback(self, next_stmt_id);
2810 }
2811 }
2812
2813 *next_stmt_id += 1;
2814
2815 sort_ident
2816 }
2817
2818 HydroNode::DeferTick { input, .. } => {
2819 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2820
2821 let defer_tick_ident =
2822 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2823
2824 match builders_or_callback {
2825 BuildersOrCallback::Builders(graph_builders) => {
2826 let builder = graph_builders.get_dfir_mut(&out_location);
2827 builder.add_dfir(
2828 parse_quote! {
2829 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2830 },
2831 None,
2832 Some(&next_stmt_id.to_string()),
2833 );
2834 }
2835 BuildersOrCallback::Callback(_, node_callback) => {
2836 node_callback(self, next_stmt_id);
2837 }
2838 }
2839
2840 *next_stmt_id += 1;
2841
2842 defer_tick_ident
2843 }
2844
2845 HydroNode::Enumerate { input, .. } => {
2846 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2847
2848 let enumerate_ident =
2849 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2850
2851 match builders_or_callback {
2852 BuildersOrCallback::Builders(graph_builders) => {
2853 let builder = graph_builders.get_dfir_mut(&out_location);
2854 let lifetime = if input.metadata().location_kind.is_top_level() {
2855 quote!('static)
2856 } else {
2857 quote!('tick)
2858 };
2859 builder.add_dfir(
2860 parse_quote! {
2861 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2862 },
2863 None,
2864 Some(&next_stmt_id.to_string()),
2865 );
2866 }
2867 BuildersOrCallback::Callback(_, node_callback) => {
2868 node_callback(self, next_stmt_id);
2869 }
2870 }
2871
2872 *next_stmt_id += 1;
2873
2874 enumerate_ident
2875 }
2876
2877 HydroNode::Inspect { f, input, .. } => {
2878 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2879
2880 let inspect_ident =
2881 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2882
2883 match builders_or_callback {
2884 BuildersOrCallback::Builders(graph_builders) => {
2885 let builder = graph_builders.get_dfir_mut(&out_location);
2886 builder.add_dfir(
2887 parse_quote! {
2888 #inspect_ident = #input_ident -> inspect(#f);
2889 },
2890 None,
2891 Some(&next_stmt_id.to_string()),
2892 );
2893 }
2894 BuildersOrCallback::Callback(_, node_callback) => {
2895 node_callback(self, next_stmt_id);
2896 }
2897 }
2898
2899 *next_stmt_id += 1;
2900
2901 inspect_ident
2902 }
2903
2904 HydroNode::Unique { input, .. } => {
2905 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2906
2907 let unique_ident =
2908 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2909
2910 match builders_or_callback {
2911 BuildersOrCallback::Builders(graph_builders) => {
2912 let builder = graph_builders.get_dfir_mut(&out_location);
2913 let lifetime = if input.metadata().location_kind.is_top_level() {
2914 quote!('static)
2915 } else {
2916 quote!('tick)
2917 };
2918
2919 builder.add_dfir(
2920 parse_quote! {
2921 #unique_ident = #input_ident -> unique::<#lifetime>();
2922 },
2923 None,
2924 Some(&next_stmt_id.to_string()),
2925 );
2926 }
2927 BuildersOrCallback::Callback(_, node_callback) => {
2928 node_callback(self, next_stmt_id);
2929 }
2930 }
2931
2932 *next_stmt_id += 1;
2933
2934 unique_ident
2935 }
2936
2937 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
2938 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
2939 parse_quote!(fold)
2940 } else if matches!(self, HydroNode::Scan { .. }) {
2941 parse_quote!(scan)
2942 } else {
2943 parse_quote!(fold_keyed)
2944 };
2945
2946 let (HydroNode::Fold { input, .. }
2947 | HydroNode::FoldKeyed { input, .. }
2948 | HydroNode::Scan { input, .. }) = self
2949 else {
2950 unreachable!()
2951 };
2952
2953 let (input, lifetime) =
2954 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
2955 debug_assert!(!input.metadata().location_kind.is_top_level());
2956 (input, quote!('static))
2957 } else if input.metadata().location_kind.is_top_level() {
2958 (input, quote!('static))
2959 } else {
2960 (input, quote!('tick))
2961 };
2962
2963 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2964
2965 let (HydroNode::Fold { init, acc, .. }
2966 | HydroNode::FoldKeyed { init, acc, .. }
2967 | HydroNode::Scan { init, acc, .. }) = &*self
2968 else {
2969 unreachable!()
2970 };
2971
2972 let fold_ident =
2973 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2974
2975 match builders_or_callback {
2976 BuildersOrCallback::Builders(graph_builders) => {
2977 if matches!(self, HydroNode::Fold { .. })
2978 && self.metadata().location_kind.is_top_level()
2979 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
2980 && graph_builders.singleton_intermediates()
2981 {
2982 let builder = graph_builders.get_dfir_mut(&out_location);
2983
2984 let acc: syn::Expr = parse_quote!({
2985 let mut __inner = #acc;
2986 move |__state, __value| {
2987 __inner(__state, __value);
2988 Some(__state.clone())
2989 }
2990 });
2991
2992 builder.add_dfir(
2993 parse_quote! {
2994 source_iter([(#init)()]) -> [0]#fold_ident;
2995 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
2996 #fold_ident = chain();
2997 },
2998 None,
2999 Some(&next_stmt_id.to_string()),
3000 );
3001 } else if matches!(self, HydroNode::FoldKeyed { .. })
3002 && self.metadata().location_kind.is_top_level()
3003 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3004 && graph_builders.singleton_intermediates()
3005 {
3006 let builder = graph_builders.get_dfir_mut(&out_location);
3007
3008 let acc: syn::Expr = parse_quote!({
3009 let mut __init = #init;
3010 let mut __inner = #acc;
3011 move |__state, (__key, __value)| {
3012 let __state = __state.entry(__key.clone()).or_insert_with(|| (__init)());
3014 __inner(__state, __value);
3015 Some((__key, __state.clone()))
3016 }
3017 });
3018
3019 builder.add_dfir(
3020 parse_quote! {
3021 source_iter([(#init)()]) -> [0]#fold_ident;
3022 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3023 },
3024 None,
3025 Some(&next_stmt_id.to_string()),
3026 );
3027 } else {
3028 let builder = graph_builders.get_dfir_mut(&out_location);
3029 builder.add_dfir(
3030 parse_quote! {
3031 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3032 },
3033 None,
3034 Some(&next_stmt_id.to_string()),
3035 );
3036 }
3037 }
3038 BuildersOrCallback::Callback(_, node_callback) => {
3039 node_callback(self, next_stmt_id);
3040 }
3041 }
3042
3043 *next_stmt_id += 1;
3044
3045 fold_ident
3046 }
3047
3048 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3049 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
3050 parse_quote!(reduce)
3051 } else {
3052 parse_quote!(reduce_keyed)
3053 };
3054
3055 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = self
3056 else {
3057 unreachable!()
3058 };
3059
3060 let (input, lifetime) =
3061 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
3062 debug_assert!(!input.metadata().location_kind.is_top_level());
3063 (input, quote!('static))
3064 } else if input.metadata().location_kind.is_top_level() {
3065 (input, quote!('static))
3066 } else {
3067 (input, quote!('tick))
3068 };
3069
3070 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3071
3072 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*self
3073 else {
3074 unreachable!()
3075 };
3076
3077 let reduce_ident =
3078 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3079
3080 match builders_or_callback {
3081 BuildersOrCallback::Builders(graph_builders) => {
3082 if matches!(self, HydroNode::Reduce { .. })
3083 && self.metadata().location_kind.is_top_level()
3084 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3085 && graph_builders.singleton_intermediates()
3086 {
3087 todo!(
3088 "Reduce with optional intermediates is not yet supported in simulator"
3089 );
3090 } else if matches!(self, HydroNode::ReduceKeyed { .. })
3091 && self.metadata().location_kind.is_top_level()
3092 && !(matches!(self.metadata().location_kind, LocationId::Atomic(_)))
3093 && graph_builders.singleton_intermediates()
3094 {
3095 todo!();
3096 } else {
3097 let builder = graph_builders.get_dfir_mut(&out_location);
3098 builder.add_dfir(
3099 parse_quote! {
3100 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3101 },
3102 None,
3103 Some(&next_stmt_id.to_string()),
3104 );
3105 }
3106 }
3107 BuildersOrCallback::Callback(_, node_callback) => {
3108 node_callback(self, next_stmt_id);
3109 }
3110 }
3111
3112 *next_stmt_id += 1;
3113
3114 reduce_ident
3115 }
3116
3117 HydroNode::ReduceKeyedWatermark {
3118 f,
3119 input,
3120 watermark,
3121 ..
3122 } => {
3123 let (input, lifetime) =
3124 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
3125 debug_assert!(!input.metadata().location_kind.is_top_level());
3126 (input, quote!('static))
3127 } else if input.metadata().location_kind.is_top_level() {
3128 (input, quote!('static))
3129 } else {
3130 (input, quote!('tick))
3131 };
3132
3133 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3134
3135 let watermark_ident =
3136 watermark.emit_core(builders_or_callback, built_tees, next_stmt_id);
3137
3138 let chain_ident = syn::Ident::new(
3139 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3140 Span::call_site(),
3141 );
3142
3143 let fold_ident =
3144 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3145
3146 match builders_or_callback {
3147 BuildersOrCallback::Builders(graph_builders) => {
3148 let builder = graph_builders.get_dfir_mut(&out_location);
3149 builder.add_dfir(
3154 parse_quote! {
3155 #chain_ident = chain();
3156 #input_ident
3157 -> map(|x| (Some(x), None))
3158 -> [0]#chain_ident;
3159 #watermark_ident
3160 -> map(|watermark| (None, Some(watermark)))
3161 -> [1]#chain_ident;
3162
3163 #fold_ident = #chain_ident
3164 -> fold::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3165 let __reduce_keyed_fn = #f;
3166 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3167 if let Some((k, v)) = opt_payload {
3168 if let Some(curr_watermark) = *opt_curr_watermark {
3169 if k <= curr_watermark {
3170 return;
3171 }
3172 }
3173 match map.entry(k) {
3174 ::std::collections::hash_map::Entry::Vacant(e) => {
3175 e.insert(v);
3176 }
3177 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3178 __reduce_keyed_fn(e.get_mut(), v);
3179 }
3180 }
3181 } else {
3182 let watermark = opt_watermark.unwrap();
3183 if let Some(curr_watermark) = *opt_curr_watermark {
3184 if watermark <= curr_watermark {
3185 return;
3186 }
3187 }
3188 *opt_curr_watermark = opt_watermark;
3189 map.retain(|k, _| *k > watermark);
3190 }
3191 }
3192 })
3193 -> flat_map(|(map, _curr_watermark)| map);
3194 },
3195 None,
3196 Some(&next_stmt_id.to_string()),
3197 );
3198 }
3199 BuildersOrCallback::Callback(_, node_callback) => {
3200 node_callback(self, next_stmt_id);
3201 }
3202 }
3203
3204 *next_stmt_id += 1;
3205
3206 fold_ident
3207 }
3208
3209 HydroNode::Network {
3210 serialize_fn: serialize_pipeline,
3211 instantiate_fn,
3212 deserialize_fn: deserialize_pipeline,
3213 input,
3214 ..
3215 } => {
3216 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3217
3218 let receiver_stream_ident =
3219 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3220
3221 match builders_or_callback {
3222 BuildersOrCallback::Builders(graph_builders) => {
3223 let (sink_expr, source_expr) = match instantiate_fn {
3224 DebugInstantiate::Building => (
3225 syn::parse_quote!(DUMMY_SINK),
3226 syn::parse_quote!(DUMMY_SOURCE),
3227 ),
3228
3229 DebugInstantiate::Finalized(finalized) => {
3230 (finalized.sink.clone(), finalized.source.clone())
3231 }
3232 };
3233
3234 graph_builders.create_network(
3235 &input.metadata().location_kind,
3236 &out_location,
3237 input_ident,
3238 &receiver_stream_ident,
3239 serialize_pipeline,
3240 sink_expr,
3241 source_expr,
3242 deserialize_pipeline,
3243 *next_stmt_id,
3244 );
3245 }
3246 BuildersOrCallback::Callback(_, node_callback) => {
3247 node_callback(self, next_stmt_id);
3248 }
3249 }
3250
3251 *next_stmt_id += 1;
3252
3253 receiver_stream_ident
3254 }
3255
3256 HydroNode::ExternalInput {
3257 instantiate_fn,
3258 deserialize_fn: deserialize_pipeline,
3259 ..
3260 } => {
3261 let receiver_stream_ident =
3262 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3263
3264 match builders_or_callback {
3265 BuildersOrCallback::Builders(graph_builders) => {
3266 let (_, source_expr) = match instantiate_fn {
3267 DebugInstantiate::Building => (
3268 syn::parse_quote!(DUMMY_SINK),
3269 syn::parse_quote!(DUMMY_SOURCE),
3270 ),
3271
3272 DebugInstantiate::Finalized(finalized) => {
3273 (finalized.sink.clone(), finalized.source.clone())
3274 }
3275 };
3276
3277 graph_builders.create_external_source(
3278 &out_location,
3279 source_expr,
3280 &receiver_stream_ident,
3281 deserialize_pipeline,
3282 *next_stmt_id,
3283 );
3284 }
3285 BuildersOrCallback::Callback(_, node_callback) => {
3286 node_callback(self, next_stmt_id);
3287 }
3288 }
3289
3290 *next_stmt_id += 1;
3291
3292 receiver_stream_ident
3293 }
3294
3295 HydroNode::Counter {
3296 tag,
3297 duration,
3298 prefix,
3299 input,
3300 ..
3301 } => {
3302 let input_ident = input.emit_core(builders_or_callback, built_tees, next_stmt_id);
3303
3304 let counter_ident =
3305 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3306
3307 match builders_or_callback {
3308 BuildersOrCallback::Builders(graph_builders) => {
3309 let builder = graph_builders.get_dfir_mut(&out_location);
3310 builder.add_dfir(
3311 parse_quote! {
3312 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3313 },
3314 None,
3315 Some(&next_stmt_id.to_string()),
3316 );
3317 }
3318 BuildersOrCallback::Callback(_, node_callback) => {
3319 node_callback(self, next_stmt_id);
3320 }
3321 }
3322
3323 *next_stmt_id += 1;
3324
3325 counter_ident
3326 }
3327 }
3328 }
3329
3330 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3331 match self {
3332 HydroNode::Placeholder => {
3333 panic!()
3334 }
3335 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3336 HydroNode::Source { source, .. } => match source {
3337 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3338 HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
3339 },
3340 HydroNode::SingletonSource { value, .. } => {
3341 transform(value);
3342 }
3343 HydroNode::CycleSource { .. }
3344 | HydroNode::Tee { .. }
3345 | HydroNode::Persist { .. }
3346 | HydroNode::YieldConcat { .. }
3347 | HydroNode::BeginAtomic { .. }
3348 | HydroNode::EndAtomic { .. }
3349 | HydroNode::Batch { .. }
3350 | HydroNode::Chain { .. }
3351 | HydroNode::ChainFirst { .. }
3352 | HydroNode::CrossProduct { .. }
3353 | HydroNode::CrossSingleton { .. }
3354 | HydroNode::ResolveFutures { .. }
3355 | HydroNode::ResolveFuturesOrdered { .. }
3356 | HydroNode::Join { .. }
3357 | HydroNode::Difference { .. }
3358 | HydroNode::AntiJoin { .. }
3359 | HydroNode::DeferTick { .. }
3360 | HydroNode::Enumerate { .. }
3361 | HydroNode::Unique { .. }
3362 | HydroNode::Sort { .. } => {}
3363 HydroNode::Map { f, .. }
3364 | HydroNode::FlatMap { f, .. }
3365 | HydroNode::Filter { f, .. }
3366 | HydroNode::FilterMap { f, .. }
3367 | HydroNode::Inspect { f, .. }
3368 | HydroNode::Reduce { f, .. }
3369 | HydroNode::ReduceKeyed { f, .. }
3370 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3371 transform(f);
3372 }
3373 HydroNode::Fold { init, acc, .. }
3374 | HydroNode::Scan { init, acc, .. }
3375 | HydroNode::FoldKeyed { init, acc, .. } => {
3376 transform(init);
3377 transform(acc);
3378 }
3379 HydroNode::Network {
3380 serialize_fn,
3381 deserialize_fn,
3382 ..
3383 } => {
3384 if let Some(serialize_fn) = serialize_fn {
3385 transform(serialize_fn);
3386 }
3387 if let Some(deserialize_fn) = deserialize_fn {
3388 transform(deserialize_fn);
3389 }
3390 }
3391 HydroNode::ExternalInput { deserialize_fn, .. } => {
3392 if let Some(deserialize_fn) = deserialize_fn {
3393 transform(deserialize_fn);
3394 }
3395 }
3396 HydroNode::Counter { duration, .. } => {
3397 transform(duration);
3398 }
3399 }
3400 }
3401
3402 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3403 &self.metadata().op
3404 }
3405
3406 pub fn metadata(&self) -> &HydroIrMetadata {
3407 match self {
3408 HydroNode::Placeholder => {
3409 panic!()
3410 }
3411 HydroNode::Cast { metadata, .. } => metadata,
3412 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3413 HydroNode::Source { metadata, .. } => metadata,
3414 HydroNode::SingletonSource { metadata, .. } => metadata,
3415 HydroNode::CycleSource { metadata, .. } => metadata,
3416 HydroNode::Tee { metadata, .. } => metadata,
3417 HydroNode::Persist { metadata, .. } => metadata,
3418 HydroNode::YieldConcat { metadata, .. } => metadata,
3419 HydroNode::BeginAtomic { metadata, .. } => metadata,
3420 HydroNode::EndAtomic { metadata, .. } => metadata,
3421 HydroNode::Batch { metadata, .. } => metadata,
3422 HydroNode::Chain { metadata, .. } => metadata,
3423 HydroNode::ChainFirst { metadata, .. } => metadata,
3424 HydroNode::CrossProduct { metadata, .. } => metadata,
3425 HydroNode::CrossSingleton { metadata, .. } => metadata,
3426 HydroNode::Join { metadata, .. } => metadata,
3427 HydroNode::Difference { metadata, .. } => metadata,
3428 HydroNode::AntiJoin { metadata, .. } => metadata,
3429 HydroNode::ResolveFutures { metadata, .. } => metadata,
3430 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3431 HydroNode::Map { metadata, .. } => metadata,
3432 HydroNode::FlatMap { metadata, .. } => metadata,
3433 HydroNode::Filter { metadata, .. } => metadata,
3434 HydroNode::FilterMap { metadata, .. } => metadata,
3435 HydroNode::DeferTick { metadata, .. } => metadata,
3436 HydroNode::Enumerate { metadata, .. } => metadata,
3437 HydroNode::Inspect { metadata, .. } => metadata,
3438 HydroNode::Unique { metadata, .. } => metadata,
3439 HydroNode::Sort { metadata, .. } => metadata,
3440 HydroNode::Scan { metadata, .. } => metadata,
3441 HydroNode::Fold { metadata, .. } => metadata,
3442 HydroNode::FoldKeyed { metadata, .. } => metadata,
3443 HydroNode::Reduce { metadata, .. } => metadata,
3444 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3445 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3446 HydroNode::ExternalInput { metadata, .. } => metadata,
3447 HydroNode::Network { metadata, .. } => metadata,
3448 HydroNode::Counter { metadata, .. } => metadata,
3449 }
3450 }
3451
3452 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3453 &mut self.metadata_mut().op
3454 }
3455
3456 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3457 match self {
3458 HydroNode::Placeholder => {
3459 panic!()
3460 }
3461 HydroNode::Cast { metadata, .. } => metadata,
3462 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3463 HydroNode::Source { metadata, .. } => metadata,
3464 HydroNode::SingletonSource { metadata, .. } => metadata,
3465 HydroNode::CycleSource { metadata, .. } => metadata,
3466 HydroNode::Tee { metadata, .. } => metadata,
3467 HydroNode::Persist { metadata, .. } => metadata,
3468 HydroNode::YieldConcat { metadata, .. } => metadata,
3469 HydroNode::BeginAtomic { metadata, .. } => metadata,
3470 HydroNode::EndAtomic { metadata, .. } => metadata,
3471 HydroNode::Batch { metadata, .. } => metadata,
3472 HydroNode::Chain { metadata, .. } => metadata,
3473 HydroNode::ChainFirst { metadata, .. } => metadata,
3474 HydroNode::CrossProduct { metadata, .. } => metadata,
3475 HydroNode::CrossSingleton { metadata, .. } => metadata,
3476 HydroNode::Join { metadata, .. } => metadata,
3477 HydroNode::Difference { metadata, .. } => metadata,
3478 HydroNode::AntiJoin { metadata, .. } => metadata,
3479 HydroNode::ResolveFutures { metadata, .. } => metadata,
3480 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3481 HydroNode::Map { metadata, .. } => metadata,
3482 HydroNode::FlatMap { metadata, .. } => metadata,
3483 HydroNode::Filter { metadata, .. } => metadata,
3484 HydroNode::FilterMap { metadata, .. } => metadata,
3485 HydroNode::DeferTick { metadata, .. } => metadata,
3486 HydroNode::Enumerate { metadata, .. } => metadata,
3487 HydroNode::Inspect { metadata, .. } => metadata,
3488 HydroNode::Unique { metadata, .. } => metadata,
3489 HydroNode::Sort { metadata, .. } => metadata,
3490 HydroNode::Scan { metadata, .. } => metadata,
3491 HydroNode::Fold { metadata, .. } => metadata,
3492 HydroNode::FoldKeyed { metadata, .. } => metadata,
3493 HydroNode::Reduce { metadata, .. } => metadata,
3494 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3495 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3496 HydroNode::ExternalInput { metadata, .. } => metadata,
3497 HydroNode::Network { metadata, .. } => metadata,
3498 HydroNode::Counter { metadata, .. } => metadata,
3499 }
3500 }
3501
3502 pub fn input(&self) -> Vec<&HydroNode> {
3503 match self {
3504 HydroNode::Placeholder => {
3505 panic!()
3506 }
3507 HydroNode::Source { .. }
3508 | HydroNode::SingletonSource { .. }
3509 | HydroNode::ExternalInput { .. }
3510 | HydroNode::CycleSource { .. }
3511 | HydroNode::Tee { .. } => {
3512 vec![]
3514 }
3515 HydroNode::Cast { inner, .. }
3516 | HydroNode::ObserveNonDet { inner, .. }
3517 | HydroNode::Persist { inner, .. }
3518 | HydroNode::YieldConcat { inner, .. }
3519 | HydroNode::BeginAtomic { inner, .. }
3520 | HydroNode::EndAtomic { inner, .. }
3521 | HydroNode::Batch { inner, .. } => {
3522 vec![inner]
3523 }
3524 HydroNode::Chain { first, second, .. } => {
3525 vec![first, second]
3526 }
3527 HydroNode::ChainFirst { first, second, .. } => {
3528 vec![first, second]
3529 }
3530 HydroNode::CrossProduct { left, right, .. }
3531 | HydroNode::CrossSingleton { left, right, .. }
3532 | HydroNode::Join { left, right, .. } => {
3533 vec![left, right]
3534 }
3535 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3536 vec![pos, neg]
3537 }
3538 HydroNode::Map { input, .. }
3539 | HydroNode::FlatMap { input, .. }
3540 | HydroNode::Filter { input, .. }
3541 | HydroNode::FilterMap { input, .. }
3542 | HydroNode::Sort { input, .. }
3543 | HydroNode::DeferTick { input, .. }
3544 | HydroNode::Enumerate { input, .. }
3545 | HydroNode::Inspect { input, .. }
3546 | HydroNode::Unique { input, .. }
3547 | HydroNode::Network { input, .. }
3548 | HydroNode::Counter { input, .. }
3549 | HydroNode::ResolveFutures { input, .. }
3550 | HydroNode::ResolveFuturesOrdered { input, .. } => {
3551 vec![input]
3552 }
3553 HydroNode::Fold { input, .. }
3554 | HydroNode::FoldKeyed { input, .. }
3555 | HydroNode::Reduce { input, .. }
3556 | HydroNode::ReduceKeyed { input, .. }
3557 | HydroNode::Scan { input, .. } => {
3558 if let HydroNode::Persist { inner, .. } = input.as_ref() {
3560 vec![inner]
3561 } else {
3562 vec![input]
3563 }
3564 }
3565 HydroNode::ReduceKeyedWatermark {
3566 input, watermark, ..
3567 } => {
3568 if let HydroNode::Persist { inner, .. } = input.as_ref() {
3570 vec![inner, watermark]
3571 } else {
3572 vec![input, watermark]
3573 }
3574 }
3575 }
3576 }
3577
3578 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3579 self.input()
3580 .iter()
3581 .map(|input_node| input_node.metadata())
3582 .collect()
3583 }
3584
3585 pub fn print_root(&self) -> String {
3586 match self {
3587 HydroNode::Placeholder => {
3588 panic!()
3589 }
3590 HydroNode::Cast { .. } => "Cast()".to_string(),
3591 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3592 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3593 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3594 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3595 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3596 HydroNode::Persist { .. } => "Persist()".to_string(),
3597 HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3598 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3599 HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3600 HydroNode::Batch { .. } => "Batch()".to_string(),
3601 HydroNode::Chain { first, second, .. } => {
3602 format!("Chain({}, {})", first.print_root(), second.print_root())
3603 }
3604 HydroNode::ChainFirst { first, second, .. } => {
3605 format!(
3606 "ChainFirst({}, {})",
3607 first.print_root(),
3608 second.print_root()
3609 )
3610 }
3611 HydroNode::CrossProduct { left, right, .. } => {
3612 format!(
3613 "CrossProduct({}, {})",
3614 left.print_root(),
3615 right.print_root()
3616 )
3617 }
3618 HydroNode::CrossSingleton { left, right, .. } => {
3619 format!(
3620 "CrossSingleton({}, {})",
3621 left.print_root(),
3622 right.print_root()
3623 )
3624 }
3625 HydroNode::Join { left, right, .. } => {
3626 format!("Join({}, {})", left.print_root(), right.print_root())
3627 }
3628 HydroNode::Difference { pos, neg, .. } => {
3629 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3630 }
3631 HydroNode::AntiJoin { pos, neg, .. } => {
3632 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3633 }
3634 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3635 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3636 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3637 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3638 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3639 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3640 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3641 HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3642 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3643 HydroNode::Unique { .. } => "Unique()".to_string(),
3644 HydroNode::Sort { .. } => "Sort()".to_string(),
3645 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3646 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3647 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3648 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3649 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3650 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3651 HydroNode::Network { .. } => "Network()".to_string(),
3652 HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3653 HydroNode::Counter { tag, duration, .. } => {
3654 format!("Counter({:?}, {:?})", tag, duration)
3655 }
3656 }
3657 }
3658}
3659
3660#[cfg(feature = "build")]
3661fn instantiate_network<'a, D>(
3662 from_location: &LocationId,
3663 to_location: &LocationId,
3664 processes: &HashMap<usize, D::Process>,
3665 clusters: &HashMap<usize, D::Cluster>,
3666 compile_env: &D::CompileEnv,
3667) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3668where
3669 D: Deploy<'a>,
3670{
3671 let ((sink, source), connect_fn) = match (from_location, to_location) {
3672 (LocationId::Process(from), LocationId::Process(to)) => {
3673 let from_node = processes
3674 .get(from)
3675 .unwrap_or_else(|| {
3676 panic!("A process used in the graph was not instantiated: {}", from)
3677 })
3678 .clone();
3679 let to_node = processes
3680 .get(to)
3681 .unwrap_or_else(|| {
3682 panic!("A process used in the graph was not instantiated: {}", to)
3683 })
3684 .clone();
3685
3686 let sink_port = D::allocate_process_port(&from_node);
3687 let source_port = D::allocate_process_port(&to_node);
3688
3689 (
3690 D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3691 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3692 )
3693 }
3694 (LocationId::Process(from), LocationId::Cluster(to)) => {
3695 let from_node = processes
3696 .get(from)
3697 .unwrap_or_else(|| {
3698 panic!("A process used in the graph was not instantiated: {}", from)
3699 })
3700 .clone();
3701 let to_node = clusters
3702 .get(to)
3703 .unwrap_or_else(|| {
3704 panic!("A cluster used in the graph was not instantiated: {}", to)
3705 })
3706 .clone();
3707
3708 let sink_port = D::allocate_process_port(&from_node);
3709 let source_port = D::allocate_cluster_port(&to_node);
3710
3711 (
3712 D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3713 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3714 )
3715 }
3716 (LocationId::Cluster(from), LocationId::Process(to)) => {
3717 let from_node = clusters
3718 .get(from)
3719 .unwrap_or_else(|| {
3720 panic!("A cluster used in the graph was not instantiated: {}", from)
3721 })
3722 .clone();
3723 let to_node = processes
3724 .get(to)
3725 .unwrap_or_else(|| {
3726 panic!("A process used in the graph was not instantiated: {}", to)
3727 })
3728 .clone();
3729
3730 let sink_port = D::allocate_cluster_port(&from_node);
3731 let source_port = D::allocate_process_port(&to_node);
3732
3733 (
3734 D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3735 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3736 )
3737 }
3738 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3739 let from_node = clusters
3740 .get(from)
3741 .unwrap_or_else(|| {
3742 panic!("A cluster used in the graph was not instantiated: {}", from)
3743 })
3744 .clone();
3745 let to_node = clusters
3746 .get(to)
3747 .unwrap_or_else(|| {
3748 panic!("A cluster used in the graph was not instantiated: {}", to)
3749 })
3750 .clone();
3751
3752 let sink_port = D::allocate_cluster_port(&from_node);
3753 let source_port = D::allocate_cluster_port(&to_node);
3754
3755 (
3756 D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
3757 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3758 )
3759 }
3760 (LocationId::Tick(_, _), _) => panic!(),
3761 (_, LocationId::Tick(_, _)) => panic!(),
3762 (LocationId::Atomic(_), _) => panic!(),
3763 (_, LocationId::Atomic(_)) => panic!(),
3764 };
3765 (sink, source, connect_fn)
3766}
3767
3768#[cfg(test)]
3769mod test {
3770 use std::mem::size_of;
3771
3772 use stageleft::{QuotedWithContext, q};
3773
3774 use super::*;
3775
3776 #[test]
3777 fn hydro_node_size() {
3778 assert_eq!(size_of::<HydroNode>(), 264);
3779 }
3780
3781 #[test]
3782 fn hydro_root_size() {
3783 assert_eq!(size_of::<HydroRoot>(), 160);
3784 }
3785
3786 #[test]
3787 fn test_simplify_q_macro_basic() {
3788 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3790 let result = simplify_q_macro(simple_expr.clone());
3791 assert_eq!(result, simple_expr);
3792 }
3793
3794 #[test]
3795 fn test_simplify_q_macro_actual_stageleft_call() {
3796 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3798 let result = simplify_q_macro(stageleft_call);
3799 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3802 }
3803
3804 #[test]
3805 fn test_closure_no_pipe_at_start() {
3806 let stageleft_call = q!({
3808 let foo = 123;
3809 move |b: usize| b + foo
3810 })
3811 .splice_fn1_ctx(&());
3812 let result = simplify_q_macro(stageleft_call);
3813 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3814 }
3815}