1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::ExternalPortId;
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::NetworkHint;
28use crate::location::dynamic::LocationId;
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40 fn from(expr: syn::Expr) -> Self {
41 Self(Box::new(expr))
42 }
43}
44
45impl Deref for DebugExpr {
46 type Target = syn::Expr;
47
48 fn deref(&self) -> &Self::Target {
49 &self.0
50 }
51}
52
53impl ToTokens for DebugExpr {
54 fn to_tokens(&self, tokens: &mut TokenStream) {
55 self.0.to_tokens(tokens);
56 }
57}
58
59impl Debug for DebugExpr {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{}", self.0.to_token_stream())
62 }
63}
64
65impl Display for DebugExpr {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 let original = self.0.as_ref().clone();
68 let simplified = simplify_q_macro(original);
69
70 write!(f, "q!({})", quote::quote!(#simplified))
73 }
74}
75
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78 let mut simplifier = QMacroSimplifier::new();
81 simplifier.visit_expr_mut(&mut expr);
82
83 if let Some(simplified) = simplifier.simplified_result {
85 simplified
86 } else {
87 expr
88 }
89}
90
91#[derive(Default)]
93pub struct QMacroSimplifier {
94 pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98 pub fn new() -> Self {
99 Self::default()
100 }
101}
102
103impl VisitMut for QMacroSimplifier {
104 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105 if self.simplified_result.is_some() {
107 return;
108 }
109
110 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111 && self.is_stageleft_runtime_support_call(&path_expr.path)
113 && let Some(closure) = self.extract_closure_from_args(&call.args)
115 {
116 self.simplified_result = Some(closure);
117 return;
118 }
119
120 syn::visit_mut::visit_expr_mut(self, expr);
123 }
124}
125
126impl QMacroSimplifier {
127 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128 if let Some(last_segment) = path.segments.last() {
130 let fn_name = last_segment.ident.to_string();
131 fn_name.contains("_type_hint")
133 && path.segments.len() > 2
134 && path.segments[0].ident == "stageleft"
135 && path.segments[1].ident == "runtime_support"
136 } else {
137 false
138 }
139 }
140
141 fn extract_closure_from_args(
142 &self,
143 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144 ) -> Option<syn::Expr> {
145 for arg in args {
147 if let syn::Expr::Closure(_) = arg {
148 return Some(arg.clone());
149 }
150 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152 return Some(closure_expr);
153 }
154 }
155 None
156 }
157
158 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159 let mut visitor = ClosureFinder {
160 found_closure: None,
161 prefer_inner_blocks: true,
162 };
163 visitor.visit_expr(expr);
164 visitor.found_closure
165 }
166}
167
168struct ClosureFinder {
170 found_closure: Option<syn::Expr>,
171 prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176 if self.found_closure.is_some() {
178 return;
179 }
180
181 match expr {
182 syn::Expr::Closure(_) => {
183 self.found_closure = Some(expr.clone());
184 }
185 syn::Expr::Block(block) if self.prefer_inner_blocks => {
186 for stmt in &block.block.stmts {
188 if let syn::Stmt::Expr(stmt_expr, _) = stmt
189 && let syn::Expr::Block(_) = stmt_expr
190 {
191 let mut inner_visitor = ClosureFinder {
193 found_closure: None,
194 prefer_inner_blocks: false, };
196 inner_visitor.visit_expr(stmt_expr);
197 if inner_visitor.found_closure.is_some() {
198 self.found_closure = Some(stmt_expr.clone());
200 return;
201 }
202 }
203 }
204
205 visit::visit_expr(self, expr);
207
208 if self.found_closure.is_some() {
211 }
213 }
214 _ => {
215 visit::visit_expr(self, expr);
217 }
218 }
219 }
220}
221
222#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229 fn from(t: syn::Type) -> Self {
230 Self(Box::new(t))
231 }
232}
233
234impl Deref for DebugType {
235 type Target = syn::Type;
236
237 fn deref(&self) -> &Self::Target {
238 &self.0
239 }
240}
241
242impl ToTokens for DebugType {
243 fn to_tokens(&self, tokens: &mut TokenStream) {
244 self.0.to_tokens(tokens);
245 }
246}
247
248impl Debug for DebugType {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 write!(f, "{}", self.0.to_token_stream())
251 }
252}
253
254pub enum DebugInstantiate {
255 Building,
256 Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260 not(feature = "build"),
261 expect(
262 dead_code,
263 reason = "sink, source unused without `feature = \"build\"`."
264 )
265)]
266pub struct DebugInstantiateFinalized {
267 sink: syn::Expr,
268 source: syn::Expr,
269 connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273 fn from(f: DebugInstantiateFinalized) -> Self {
274 Self::Finalized(Box::new(f))
275 }
276}
277
278impl Debug for DebugInstantiate {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 write!(f, "<network instantiate>")
281 }
282}
283
284impl Hash for DebugInstantiate {
285 fn hash<H: Hasher>(&self, _state: &mut H) {
286 }
288}
289
290impl Clone for DebugInstantiate {
291 fn clone(&self) -> Self {
292 match self {
293 DebugInstantiate::Building => DebugInstantiate::Building,
294 DebugInstantiate::Finalized(_) => {
295 panic!("DebugInstantiate::Finalized should not be cloned")
296 }
297 }
298 }
299}
300
301#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304 Stream(DebugExpr),
305 ExternalNetwork(),
306 Iter(DebugExpr),
307 Spin(),
308 ClusterMembers(LocationId),
309}
310
311#[cfg(feature = "build")]
312pub trait DfirBuilder {
318 fn singleton_intermediates(&self) -> bool;
320
321 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
323
324 fn batch(
325 &mut self,
326 in_ident: syn::Ident,
327 in_location: &LocationId,
328 in_kind: &CollectionKind,
329 out_ident: &syn::Ident,
330 out_location: &LocationId,
331 op_meta: &HydroIrOpMetadata,
332 );
333 fn yield_from_tick(
334 &mut self,
335 in_ident: syn::Ident,
336 in_location: &LocationId,
337 in_kind: &CollectionKind,
338 out_ident: &syn::Ident,
339 out_location: &LocationId,
340 );
341
342 fn begin_atomic(
343 &mut self,
344 in_ident: syn::Ident,
345 in_location: &LocationId,
346 in_kind: &CollectionKind,
347 out_ident: &syn::Ident,
348 out_location: &LocationId,
349 op_meta: &HydroIrOpMetadata,
350 );
351 fn end_atomic(
352 &mut self,
353 in_ident: syn::Ident,
354 in_location: &LocationId,
355 in_kind: &CollectionKind,
356 out_ident: &syn::Ident,
357 );
358
359 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
360 fn observe_nondet(
361 &mut self,
362 trusted: bool,
363 location: &LocationId,
364 in_ident: syn::Ident,
365 in_kind: &CollectionKind,
366 out_ident: &syn::Ident,
367 out_kind: &CollectionKind,
368 op_meta: &HydroIrOpMetadata,
369 );
370
371 #[expect(clippy::too_many_arguments, reason = "TODO")]
372 fn create_network(
373 &mut self,
374 from: &LocationId,
375 to: &LocationId,
376 input_ident: syn::Ident,
377 out_ident: &syn::Ident,
378 serialize: &Option<DebugExpr>,
379 sink: syn::Expr,
380 source: syn::Expr,
381 deserialize: &Option<DebugExpr>,
382 tag_id: usize,
383 );
384
385 fn create_external_source(
386 &mut self,
387 on: &LocationId,
388 source_expr: syn::Expr,
389 out_ident: &syn::Ident,
390 deserialize: &Option<DebugExpr>,
391 tag_id: usize,
392 );
393
394 fn create_external_output(
395 &mut self,
396 on: &LocationId,
397 sink_expr: syn::Expr,
398 input_ident: &syn::Ident,
399 serialize: &Option<DebugExpr>,
400 tag_id: usize,
401 );
402}
403
404#[cfg(feature = "build")]
405impl DfirBuilder for BTreeMap<usize, FlatGraphBuilder> {
406 fn singleton_intermediates(&self) -> bool {
407 false
408 }
409
410 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
411 self.entry(location.root().raw_id()).or_default()
412 }
413
414 fn batch(
415 &mut self,
416 in_ident: syn::Ident,
417 in_location: &LocationId,
418 in_kind: &CollectionKind,
419 out_ident: &syn::Ident,
420 _out_location: &LocationId,
421 _op_meta: &HydroIrOpMetadata,
422 ) {
423 let builder = self.get_dfir_mut(in_location.root());
424 if in_kind.is_bounded()
425 && matches!(
426 in_kind,
427 CollectionKind::Singleton { .. }
428 | CollectionKind::Optional { .. }
429 | CollectionKind::KeyedSingleton { .. }
430 )
431 {
432 assert!(in_location.is_top_level());
433 builder.add_dfir(
434 parse_quote! {
435 #out_ident = #in_ident -> persist::<'static>();
436 },
437 None,
438 None,
439 );
440 } else {
441 builder.add_dfir(
442 parse_quote! {
443 #out_ident = #in_ident;
444 },
445 None,
446 None,
447 );
448 }
449 }
450
451 fn yield_from_tick(
452 &mut self,
453 in_ident: syn::Ident,
454 in_location: &LocationId,
455 _in_kind: &CollectionKind,
456 out_ident: &syn::Ident,
457 _out_location: &LocationId,
458 ) {
459 let builder = self.get_dfir_mut(in_location.root());
460 builder.add_dfir(
461 parse_quote! {
462 #out_ident = #in_ident;
463 },
464 None,
465 None,
466 );
467 }
468
469 fn begin_atomic(
470 &mut self,
471 in_ident: syn::Ident,
472 in_location: &LocationId,
473 _in_kind: &CollectionKind,
474 out_ident: &syn::Ident,
475 _out_location: &LocationId,
476 _op_meta: &HydroIrOpMetadata,
477 ) {
478 let builder = self.get_dfir_mut(in_location.root());
479 builder.add_dfir(
480 parse_quote! {
481 #out_ident = #in_ident;
482 },
483 None,
484 None,
485 );
486 }
487
488 fn end_atomic(
489 &mut self,
490 in_ident: syn::Ident,
491 in_location: &LocationId,
492 _in_kind: &CollectionKind,
493 out_ident: &syn::Ident,
494 ) {
495 let builder = self.get_dfir_mut(in_location.root());
496 builder.add_dfir(
497 parse_quote! {
498 #out_ident = #in_ident;
499 },
500 None,
501 None,
502 );
503 }
504
505 fn observe_nondet(
506 &mut self,
507 _trusted: bool,
508 location: &LocationId,
509 in_ident: syn::Ident,
510 _in_kind: &CollectionKind,
511 out_ident: &syn::Ident,
512 _out_kind: &CollectionKind,
513 _op_meta: &HydroIrOpMetadata,
514 ) {
515 let builder = self.get_dfir_mut(location);
516 builder.add_dfir(
517 parse_quote! {
518 #out_ident = #in_ident;
519 },
520 None,
521 None,
522 );
523 }
524
525 fn create_network(
526 &mut self,
527 from: &LocationId,
528 to: &LocationId,
529 input_ident: syn::Ident,
530 out_ident: &syn::Ident,
531 serialize: &Option<DebugExpr>,
532 sink: syn::Expr,
533 source: syn::Expr,
534 deserialize: &Option<DebugExpr>,
535 tag_id: usize,
536 ) {
537 let sender_builder = self.get_dfir_mut(from);
538 if let Some(serialize_pipeline) = serialize {
539 sender_builder.add_dfir(
540 parse_quote! {
541 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
542 },
543 None,
544 Some(&format!("send{}", tag_id)),
546 );
547 } else {
548 sender_builder.add_dfir(
549 parse_quote! {
550 #input_ident -> dest_sink(#sink);
551 },
552 None,
553 Some(&format!("send{}", tag_id)),
554 );
555 }
556
557 let receiver_builder = self.get_dfir_mut(to);
558 if let Some(deserialize_pipeline) = deserialize {
559 receiver_builder.add_dfir(
560 parse_quote! {
561 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
562 },
563 None,
564 Some(&format!("recv{}", tag_id)),
565 );
566 } else {
567 receiver_builder.add_dfir(
568 parse_quote! {
569 #out_ident = source_stream(#source);
570 },
571 None,
572 Some(&format!("recv{}", tag_id)),
573 );
574 }
575 }
576
577 fn create_external_source(
578 &mut self,
579 on: &LocationId,
580 source_expr: syn::Expr,
581 out_ident: &syn::Ident,
582 deserialize: &Option<DebugExpr>,
583 tag_id: usize,
584 ) {
585 let receiver_builder = self.get_dfir_mut(on);
586 if let Some(deserialize_pipeline) = deserialize {
587 receiver_builder.add_dfir(
588 parse_quote! {
589 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
590 },
591 None,
592 Some(&format!("recv{}", tag_id)),
593 );
594 } else {
595 receiver_builder.add_dfir(
596 parse_quote! {
597 #out_ident = source_stream(#source_expr);
598 },
599 None,
600 Some(&format!("recv{}", tag_id)),
601 );
602 }
603 }
604
605 fn create_external_output(
606 &mut self,
607 on: &LocationId,
608 sink_expr: syn::Expr,
609 input_ident: &syn::Ident,
610 serialize: &Option<DebugExpr>,
611 tag_id: usize,
612 ) {
613 let sender_builder = self.get_dfir_mut(on);
614 if let Some(serialize_fn) = serialize {
615 sender_builder.add_dfir(
616 parse_quote! {
617 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
618 },
619 None,
620 Some(&format!("send{}", tag_id)),
622 );
623 } else {
624 sender_builder.add_dfir(
625 parse_quote! {
626 #input_ident -> dest_sink(#sink_expr);
627 },
628 None,
629 Some(&format!("send{}", tag_id)),
630 );
631 }
632 }
633}
634
635#[cfg(feature = "build")]
636pub enum BuildersOrCallback<'a, L, N>
637where
638 L: FnMut(&mut HydroRoot, &mut usize),
639 N: FnMut(&mut HydroNode, &mut usize),
640{
641 Builders(&'a mut dyn DfirBuilder),
642 Callback(L, N),
643}
644
645#[derive(Debug, Hash)]
649pub enum HydroRoot {
650 ForEach {
651 f: DebugExpr,
652 input: Box<HydroNode>,
653 op_metadata: HydroIrOpMetadata,
654 },
655 SendExternal {
656 to_external_id: usize,
657 to_port_id: ExternalPortId,
658 to_many: bool,
659 unpaired: bool,
660 serialize_fn: Option<DebugExpr>,
661 instantiate_fn: DebugInstantiate,
662 input: Box<HydroNode>,
663 op_metadata: HydroIrOpMetadata,
664 },
665 DestSink {
666 sink: DebugExpr,
667 input: Box<HydroNode>,
668 op_metadata: HydroIrOpMetadata,
669 },
670 CycleSink {
671 ident: syn::Ident,
672 input: Box<HydroNode>,
673 op_metadata: HydroIrOpMetadata,
674 },
675}
676
677impl HydroRoot {
678 #[cfg(feature = "build")]
679 pub fn compile_network<'a, D>(
680 &mut self,
681 extra_stmts: &mut BTreeMap<usize, Vec<syn::Stmt>>,
682 seen_tees: &mut SeenTees,
683 processes: &HashMap<usize, D::Process>,
684 clusters: &HashMap<usize, D::Cluster>,
685 externals: &HashMap<usize, D::External>,
686 ) where
687 D: Deploy<'a>,
688 {
689 let refcell_extra_stmts = RefCell::new(extra_stmts);
690 self.transform_bottom_up(
691 &mut |l| {
692 if let HydroRoot::SendExternal {
693 input,
694 to_external_id,
695 to_port_id,
696 to_many,
697 unpaired,
698 instantiate_fn,
699 ..
700 } = l
701 {
702 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
703 DebugInstantiate::Building => {
704 let to_node = externals
705 .get(to_external_id)
706 .unwrap_or_else(|| {
707 panic!("A external used in the graph was not instantiated: {}", to_external_id)
708 })
709 .clone();
710
711 match input.metadata().location_kind.root() {
712 LocationId::Process(process_id) => {
713 if *to_many {
714 (
715 (
716 D::e2o_many_sink(format!("{}_{}", *to_external_id, *to_port_id)),
717 parse_quote!(DUMMY),
718 ),
719 Box::new(|| {}) as Box<dyn FnOnce()>,
720 )
721 } else {
722 let from_node = processes
723 .get(process_id)
724 .unwrap_or_else(|| {
725 panic!("A process used in the graph was not instantiated: {}", process_id)
726 })
727 .clone();
728
729 let sink_port = from_node.next_port();
730 let source_port = to_node.next_port();
731
732 if *unpaired {
733 use stageleft::quote_type;
734 use tokio_util::codec::LengthDelimitedCodec;
735
736 to_node.register(*to_port_id, source_port.clone());
737
738 let _ = D::e2o_source(
739 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
740 &to_node, &source_port,
741 &from_node, &sink_port,
742 "e_type::<LengthDelimitedCodec>(),
743 format!("{}_{}", *to_external_id, *to_port_id)
744 );
745 }
746
747 (
748 (
749 D::o2e_sink(
750 &from_node,
751 &sink_port,
752 &to_node,
753 &source_port,
754 format!("{}_{}", *to_external_id, *to_port_id)
755 ),
756 parse_quote!(DUMMY),
757 ),
758 if *unpaired {
759 D::e2o_connect(
760 &to_node,
761 &source_port,
762 &from_node,
763 &sink_port,
764 *to_many,
765 NetworkHint::Auto,
766 )
767 } else {
768 Box::new(|| {}) as Box<dyn FnOnce()>
769 },
770 )
771 }
772 }
773 LocationId::Cluster(_) => todo!(),
774 _ => panic!()
775 }
776 },
777
778 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
779 };
780
781 *instantiate_fn = DebugInstantiateFinalized {
782 sink: sink_expr,
783 source: source_expr,
784 connect_fn: Some(connect_fn),
785 }
786 .into();
787 }
788 },
789 &mut |n| {
790 if let HydroNode::Network {
791 input,
792 instantiate_fn,
793 metadata,
794 ..
795 } = n
796 {
797 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
798 DebugInstantiate::Building => instantiate_network::<D>(
799 input.metadata().location_kind.root(),
800 metadata.location_kind.root(),
801 processes,
802 clusters,
803 ),
804
805 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
806 };
807
808 *instantiate_fn = DebugInstantiateFinalized {
809 sink: sink_expr,
810 source: source_expr,
811 connect_fn: Some(connect_fn),
812 }
813 .into();
814 } else if let HydroNode::ExternalInput {
815 from_external_id,
816 from_port_id,
817 from_many,
818 codec_type,
819 port_hint,
820 instantiate_fn,
821 metadata,
822 ..
823 } = n
824 {
825 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
826 DebugInstantiate::Building => {
827 let from_node = externals
828 .get(from_external_id)
829 .unwrap_or_else(|| {
830 panic!(
831 "A external used in the graph was not instantiated: {}",
832 from_external_id
833 )
834 })
835 .clone();
836
837 match metadata.location_kind.root() {
838 LocationId::Process(process_id) => {
839 let to_node = processes
840 .get(process_id)
841 .unwrap_or_else(|| {
842 panic!("A process used in the graph was not instantiated: {}", process_id)
843 })
844 .clone();
845
846 let sink_port = from_node.next_port();
847 let source_port = to_node.next_port();
848
849 from_node.register(*from_port_id, sink_port.clone());
850
851 (
852 (
853 parse_quote!(DUMMY),
854 if *from_many {
855 D::e2o_many_source(
856 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
857 &to_node, &source_port,
858 codec_type.0.as_ref(),
859 format!("{}_{}", *from_external_id, *from_port_id)
860 )
861 } else {
862 D::e2o_source(
863 refcell_extra_stmts.borrow_mut().entry(*process_id).or_default(),
864 &from_node, &sink_port,
865 &to_node, &source_port,
866 codec_type.0.as_ref(),
867 format!("{}_{}", *from_external_id, *from_port_id)
868 )
869 },
870 ),
871 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
872 )
873 }
874 LocationId::Cluster(_) => todo!(),
875 _ => panic!()
876 }
877 },
878
879 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
880 };
881
882 *instantiate_fn = DebugInstantiateFinalized {
883 sink: sink_expr,
884 source: source_expr,
885 connect_fn: Some(connect_fn),
886 }
887 .into();
888 }
889 },
890 seen_tees,
891 false,
892 );
893 }
894
895 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
896 self.transform_bottom_up(
897 &mut |l| {
898 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
899 match instantiate_fn {
900 DebugInstantiate::Building => panic!("network not built"),
901
902 DebugInstantiate::Finalized(finalized) => {
903 (finalized.connect_fn.take().unwrap())();
904 }
905 }
906 }
907 },
908 &mut |n| {
909 if let HydroNode::Network { instantiate_fn, .. }
910 | HydroNode::ExternalInput { instantiate_fn, .. } = n
911 {
912 match instantiate_fn {
913 DebugInstantiate::Building => panic!("network not built"),
914
915 DebugInstantiate::Finalized(finalized) => {
916 (finalized.connect_fn.take().unwrap())();
917 }
918 }
919 }
920 },
921 seen_tees,
922 false,
923 );
924 }
925
926 pub fn transform_bottom_up(
927 &mut self,
928 transform_root: &mut impl FnMut(&mut HydroRoot),
929 transform_node: &mut impl FnMut(&mut HydroNode),
930 seen_tees: &mut SeenTees,
931 check_well_formed: bool,
932 ) {
933 self.transform_children(
934 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
935 seen_tees,
936 );
937
938 transform_root(self);
939 }
940
941 pub fn transform_children(
942 &mut self,
943 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
944 seen_tees: &mut SeenTees,
945 ) {
946 match self {
947 HydroRoot::ForEach { input, .. }
948 | HydroRoot::SendExternal { input, .. }
949 | HydroRoot::DestSink { input, .. }
950 | HydroRoot::CycleSink { input, .. } => {
951 transform(input, seen_tees);
952 }
953 }
954 }
955
956 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
957 match self {
958 HydroRoot::ForEach {
959 f,
960 input,
961 op_metadata,
962 } => HydroRoot::ForEach {
963 f: f.clone(),
964 input: Box::new(input.deep_clone(seen_tees)),
965 op_metadata: op_metadata.clone(),
966 },
967 HydroRoot::SendExternal {
968 to_external_id,
969 to_port_id,
970 to_many,
971 unpaired,
972 serialize_fn,
973 instantiate_fn,
974 input,
975 op_metadata,
976 } => HydroRoot::SendExternal {
977 to_external_id: *to_external_id,
978 to_port_id: *to_port_id,
979 to_many: *to_many,
980 unpaired: *unpaired,
981 serialize_fn: serialize_fn.clone(),
982 instantiate_fn: instantiate_fn.clone(),
983 input: Box::new(input.deep_clone(seen_tees)),
984 op_metadata: op_metadata.clone(),
985 },
986 HydroRoot::DestSink {
987 sink,
988 input,
989 op_metadata,
990 } => HydroRoot::DestSink {
991 sink: sink.clone(),
992 input: Box::new(input.deep_clone(seen_tees)),
993 op_metadata: op_metadata.clone(),
994 },
995 HydroRoot::CycleSink {
996 ident,
997 input,
998 op_metadata,
999 } => HydroRoot::CycleSink {
1000 ident: ident.clone(),
1001 input: Box::new(input.deep_clone(seen_tees)),
1002 op_metadata: op_metadata.clone(),
1003 },
1004 }
1005 }
1006
1007 #[cfg(feature = "build")]
1008 pub fn emit<'a, D: Deploy<'a>>(
1009 &mut self,
1010 graph_builders: &mut dyn DfirBuilder,
1011 seen_tees: &mut SeenTees,
1012 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1013 next_stmt_id: &mut usize,
1014 ) {
1015 self.emit_core::<D>(
1016 &mut BuildersOrCallback::<
1017 fn(&mut HydroRoot, &mut usize),
1018 fn(&mut HydroNode, &mut usize),
1019 >::Builders(graph_builders),
1020 seen_tees,
1021 built_tees,
1022 next_stmt_id,
1023 );
1024 }
1025
1026 #[cfg(feature = "build")]
1027 pub fn emit_core<'a, D: Deploy<'a>>(
1028 &mut self,
1029 builders_or_callback: &mut BuildersOrCallback<
1030 impl FnMut(&mut HydroRoot, &mut usize),
1031 impl FnMut(&mut HydroNode, &mut usize),
1032 >,
1033 seen_tees: &mut SeenTees,
1034 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1035 next_stmt_id: &mut usize,
1036 ) {
1037 match self {
1038 HydroRoot::ForEach { f, input, .. } => {
1039 let input_ident =
1040 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1041
1042 match builders_or_callback {
1043 BuildersOrCallback::Builders(graph_builders) => {
1044 graph_builders
1045 .get_dfir_mut(&input.metadata().location_kind)
1046 .add_dfir(
1047 parse_quote! {
1048 #input_ident -> for_each(#f);
1049 },
1050 None,
1051 Some(&next_stmt_id.to_string()),
1052 );
1053 }
1054 BuildersOrCallback::Callback(leaf_callback, _) => {
1055 leaf_callback(self, next_stmt_id);
1056 }
1057 }
1058
1059 *next_stmt_id += 1;
1060 }
1061
1062 HydroRoot::SendExternal {
1063 serialize_fn,
1064 instantiate_fn,
1065 input,
1066 ..
1067 } => {
1068 let input_ident =
1069 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1070
1071 match builders_or_callback {
1072 BuildersOrCallback::Builders(graph_builders) => {
1073 let (sink_expr, _) = match instantiate_fn {
1074 DebugInstantiate::Building => (
1075 syn::parse_quote!(DUMMY_SINK),
1076 syn::parse_quote!(DUMMY_SOURCE),
1077 ),
1078
1079 DebugInstantiate::Finalized(finalized) => {
1080 (finalized.sink.clone(), finalized.source.clone())
1081 }
1082 };
1083
1084 graph_builders.create_external_output(
1085 &input.metadata().location_kind,
1086 sink_expr,
1087 &input_ident,
1088 serialize_fn,
1089 *next_stmt_id,
1090 );
1091 }
1092 BuildersOrCallback::Callback(leaf_callback, _) => {
1093 leaf_callback(self, next_stmt_id);
1094 }
1095 }
1096
1097 *next_stmt_id += 1;
1098 }
1099
1100 HydroRoot::DestSink { sink, input, .. } => {
1101 let input_ident =
1102 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1103
1104 match builders_or_callback {
1105 BuildersOrCallback::Builders(graph_builders) => {
1106 graph_builders
1107 .get_dfir_mut(&input.metadata().location_kind)
1108 .add_dfir(
1109 parse_quote! {
1110 #input_ident -> dest_sink(#sink);
1111 },
1112 None,
1113 Some(&next_stmt_id.to_string()),
1114 );
1115 }
1116 BuildersOrCallback::Callback(leaf_callback, _) => {
1117 leaf_callback(self, next_stmt_id);
1118 }
1119 }
1120
1121 *next_stmt_id += 1;
1122 }
1123
1124 HydroRoot::CycleSink { ident, input, .. } => {
1125 let input_ident =
1126 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1127
1128 match builders_or_callback {
1129 BuildersOrCallback::Builders(graph_builders) => {
1130 let elem_type: syn::Type = match &input.metadata().collection_kind {
1131 CollectionKind::KeyedSingleton {
1132 key_type,
1133 value_type,
1134 ..
1135 }
1136 | CollectionKind::KeyedStream {
1137 key_type,
1138 value_type,
1139 ..
1140 } => {
1141 parse_quote!((#key_type, #value_type))
1142 }
1143 CollectionKind::Stream { element_type, .. }
1144 | CollectionKind::Singleton { element_type, .. }
1145 | CollectionKind::Optional { element_type, .. } => {
1146 parse_quote!(#element_type)
1147 }
1148 };
1149
1150 graph_builders
1151 .get_dfir_mut(&input.metadata().location_kind)
1152 .add_dfir(
1153 parse_quote! {
1154 #ident = #input_ident -> identity::<#elem_type>();
1155 },
1156 None,
1157 None,
1158 );
1159 }
1160 BuildersOrCallback::Callback(_, _) => {}
1162 }
1163 }
1164 }
1165 }
1166
1167 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1168 match self {
1169 HydroRoot::ForEach { op_metadata, .. }
1170 | HydroRoot::SendExternal { op_metadata, .. }
1171 | HydroRoot::DestSink { op_metadata, .. }
1172 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1173 }
1174 }
1175
1176 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1177 match self {
1178 HydroRoot::ForEach { op_metadata, .. }
1179 | HydroRoot::SendExternal { op_metadata, .. }
1180 | HydroRoot::DestSink { op_metadata, .. }
1181 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1182 }
1183 }
1184
1185 pub fn input(&self) -> &HydroNode {
1186 match self {
1187 HydroRoot::ForEach { input, .. }
1188 | HydroRoot::SendExternal { input, .. }
1189 | HydroRoot::DestSink { input, .. }
1190 | HydroRoot::CycleSink { input, .. } => input,
1191 }
1192 }
1193
1194 pub fn input_metadata(&self) -> &HydroIrMetadata {
1195 self.input().metadata()
1196 }
1197
1198 pub fn print_root(&self) -> String {
1199 match self {
1200 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1201 HydroRoot::SendExternal { .. } => "SendExternal".to_string(),
1202 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1203 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1204 }
1205 }
1206
1207 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1208 match self {
1209 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1210 transform(f);
1211 }
1212 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1213 }
1214 }
1215}
1216
1217#[cfg(feature = "build")]
1218pub fn emit<'a, D: Deploy<'a>>(ir: &mut Vec<HydroRoot>) -> BTreeMap<usize, FlatGraphBuilder> {
1219 let mut builders = BTreeMap::new();
1220 let mut seen_tees = HashMap::new();
1221 let mut built_tees = HashMap::new();
1222 let mut next_stmt_id = 0;
1223 for leaf in ir {
1224 leaf.emit::<D>(
1225 &mut builders,
1226 &mut seen_tees,
1227 &mut built_tees,
1228 &mut next_stmt_id,
1229 );
1230 }
1231 builders
1232}
1233
1234#[cfg(feature = "build")]
1235pub fn traverse_dfir<'a, D: Deploy<'a>>(
1236 ir: &mut [HydroRoot],
1237 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1238 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1239) {
1240 let mut seen_tees = HashMap::new();
1241 let mut built_tees = HashMap::new();
1242 let mut next_stmt_id = 0;
1243 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1244 ir.iter_mut().for_each(|leaf| {
1245 leaf.emit_core::<D>(
1246 &mut callback,
1247 &mut seen_tees,
1248 &mut built_tees,
1249 &mut next_stmt_id,
1250 );
1251 });
1252}
1253
1254pub fn transform_bottom_up(
1255 ir: &mut [HydroRoot],
1256 transform_root: &mut impl FnMut(&mut HydroRoot),
1257 transform_node: &mut impl FnMut(&mut HydroNode),
1258 check_well_formed: bool,
1259) {
1260 let mut seen_tees = HashMap::new();
1261 ir.iter_mut().for_each(|leaf| {
1262 leaf.transform_bottom_up(
1263 transform_root,
1264 transform_node,
1265 &mut seen_tees,
1266 check_well_formed,
1267 );
1268 });
1269}
1270
1271pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1272 let mut seen_tees = HashMap::new();
1273 ir.iter()
1274 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1275 .collect()
1276}
1277
1278type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1279thread_local! {
1280 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1281}
1282
1283pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1284 PRINTED_TEES.with(|printed_tees| {
1285 let mut printed_tees_mut = printed_tees.borrow_mut();
1286 *printed_tees_mut = Some((0, HashMap::new()));
1287 drop(printed_tees_mut);
1288
1289 let ret = f();
1290
1291 let mut printed_tees_mut = printed_tees.borrow_mut();
1292 *printed_tees_mut = None;
1293
1294 ret
1295 })
1296}
1297
1298pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1299
1300impl TeeNode {
1301 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1302 Rc::as_ptr(&self.0)
1303 }
1304}
1305
1306impl Debug for TeeNode {
1307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1308 PRINTED_TEES.with(|printed_tees| {
1309 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1310 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1311
1312 if let Some(printed_tees_mut) = printed_tees_mut {
1313 if let Some(existing) = printed_tees_mut
1314 .1
1315 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1316 {
1317 write!(f, "<tee {}>", existing)
1318 } else {
1319 let next_id = printed_tees_mut.0;
1320 printed_tees_mut.0 += 1;
1321 printed_tees_mut
1322 .1
1323 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1324 drop(printed_tees_mut_borrow);
1325 write!(f, "<tee {}>: ", next_id)?;
1326 Debug::fmt(&self.0.borrow(), f)
1327 }
1328 } else {
1329 drop(printed_tees_mut_borrow);
1330 write!(f, "<tee>: ")?;
1331 Debug::fmt(&self.0.borrow(), f)
1332 }
1333 })
1334 }
1335}
1336
1337impl Hash for TeeNode {
1338 fn hash<H: Hasher>(&self, state: &mut H) {
1339 self.0.borrow_mut().hash(state);
1340 }
1341}
1342
1343#[derive(Clone, PartialEq, Eq, Debug)]
1344pub enum BoundKind {
1345 Unbounded,
1346 Bounded,
1347}
1348
1349#[derive(Clone, PartialEq, Eq, Debug)]
1350pub enum StreamOrder {
1351 NoOrder,
1352 TotalOrder,
1353}
1354
1355#[derive(Clone, PartialEq, Eq, Debug)]
1356pub enum StreamRetry {
1357 AtLeastOnce,
1358 ExactlyOnce,
1359}
1360
1361#[derive(Clone, PartialEq, Eq, Debug)]
1362pub enum KeyedSingletonBoundKind {
1363 Unbounded,
1364 BoundedValue,
1365 Bounded,
1366}
1367
1368#[derive(Clone, PartialEq, Eq, Debug)]
1369pub enum CollectionKind {
1370 Stream {
1371 bound: BoundKind,
1372 order: StreamOrder,
1373 retry: StreamRetry,
1374 element_type: DebugType,
1375 },
1376 Singleton {
1377 bound: BoundKind,
1378 element_type: DebugType,
1379 },
1380 Optional {
1381 bound: BoundKind,
1382 element_type: DebugType,
1383 },
1384 KeyedStream {
1385 bound: BoundKind,
1386 value_order: StreamOrder,
1387 value_retry: StreamRetry,
1388 key_type: DebugType,
1389 value_type: DebugType,
1390 },
1391 KeyedSingleton {
1392 bound: KeyedSingletonBoundKind,
1393 key_type: DebugType,
1394 value_type: DebugType,
1395 },
1396}
1397
1398impl CollectionKind {
1399 pub fn is_bounded(&self) -> bool {
1400 matches!(
1401 self,
1402 CollectionKind::Stream {
1403 bound: BoundKind::Bounded,
1404 ..
1405 } | CollectionKind::Singleton {
1406 bound: BoundKind::Bounded,
1407 ..
1408 } | CollectionKind::Optional {
1409 bound: BoundKind::Bounded,
1410 ..
1411 } | CollectionKind::KeyedStream {
1412 bound: BoundKind::Bounded,
1413 ..
1414 } | CollectionKind::KeyedSingleton {
1415 bound: KeyedSingletonBoundKind::Bounded,
1416 ..
1417 }
1418 )
1419 }
1420}
1421
1422#[derive(Clone)]
1423pub struct HydroIrMetadata {
1424 pub location_kind: LocationId,
1425 pub collection_kind: CollectionKind,
1426 pub cardinality: Option<usize>,
1427 pub tag: Option<String>,
1428 pub op: HydroIrOpMetadata,
1429}
1430
1431impl Hash for HydroIrMetadata {
1433 fn hash<H: Hasher>(&self, _: &mut H) {}
1434}
1435
1436impl PartialEq for HydroIrMetadata {
1437 fn eq(&self, _: &Self) -> bool {
1438 true
1439 }
1440}
1441
1442impl Eq for HydroIrMetadata {}
1443
1444impl Debug for HydroIrMetadata {
1445 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1446 f.debug_struct("HydroIrMetadata")
1447 .field("location_kind", &self.location_kind)
1448 .field("collection_kind", &self.collection_kind)
1449 .finish()
1450 }
1451}
1452
1453#[derive(Clone)]
1456pub struct HydroIrOpMetadata {
1457 pub backtrace: Backtrace,
1458 pub cpu_usage: Option<f64>,
1459 pub network_recv_cpu_usage: Option<f64>,
1460 pub id: Option<usize>,
1461}
1462
1463impl HydroIrOpMetadata {
1464 #[expect(
1465 clippy::new_without_default,
1466 reason = "explicit calls to new ensure correct backtrace bounds"
1467 )]
1468 pub fn new() -> HydroIrOpMetadata {
1469 Self::new_with_skip(1)
1470 }
1471
1472 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1473 HydroIrOpMetadata {
1474 backtrace: Backtrace::get_backtrace(2 + skip_count),
1475 cpu_usage: None,
1476 network_recv_cpu_usage: None,
1477 id: None,
1478 }
1479 }
1480}
1481
1482impl Debug for HydroIrOpMetadata {
1483 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1484 f.debug_struct("HydroIrOpMetadata").finish()
1485 }
1486}
1487
1488impl Hash for HydroIrOpMetadata {
1489 fn hash<H: Hasher>(&self, _: &mut H) {}
1490}
1491
1492#[derive(Debug, Hash)]
1495pub enum HydroNode {
1496 Placeholder,
1497
1498 Cast {
1506 inner: Box<HydroNode>,
1507 metadata: HydroIrMetadata,
1508 },
1509
1510 ObserveNonDet {
1516 inner: Box<HydroNode>,
1517 trusted: bool, metadata: HydroIrMetadata,
1519 },
1520
1521 Source {
1522 source: HydroSource,
1523 metadata: HydroIrMetadata,
1524 },
1525
1526 SingletonSource {
1527 value: DebugExpr,
1528 metadata: HydroIrMetadata,
1529 },
1530
1531 CycleSource {
1532 ident: syn::Ident,
1533 metadata: HydroIrMetadata,
1534 },
1535
1536 Tee {
1537 inner: TeeNode,
1538 metadata: HydroIrMetadata,
1539 },
1540
1541 BeginAtomic {
1542 inner: Box<HydroNode>,
1543 metadata: HydroIrMetadata,
1544 },
1545
1546 EndAtomic {
1547 inner: Box<HydroNode>,
1548 metadata: HydroIrMetadata,
1549 },
1550
1551 Batch {
1552 inner: Box<HydroNode>,
1553 metadata: HydroIrMetadata,
1554 },
1555
1556 YieldConcat {
1557 inner: Box<HydroNode>,
1558 metadata: HydroIrMetadata,
1559 },
1560
1561 Chain {
1562 first: Box<HydroNode>,
1563 second: Box<HydroNode>,
1564 metadata: HydroIrMetadata,
1565 },
1566
1567 ChainFirst {
1568 first: Box<HydroNode>,
1569 second: Box<HydroNode>,
1570 metadata: HydroIrMetadata,
1571 },
1572
1573 CrossProduct {
1574 left: Box<HydroNode>,
1575 right: Box<HydroNode>,
1576 metadata: HydroIrMetadata,
1577 },
1578
1579 CrossSingleton {
1580 left: Box<HydroNode>,
1581 right: Box<HydroNode>,
1582 metadata: HydroIrMetadata,
1583 },
1584
1585 Join {
1586 left: Box<HydroNode>,
1587 right: Box<HydroNode>,
1588 metadata: HydroIrMetadata,
1589 },
1590
1591 Difference {
1592 pos: Box<HydroNode>,
1593 neg: Box<HydroNode>,
1594 metadata: HydroIrMetadata,
1595 },
1596
1597 AntiJoin {
1598 pos: Box<HydroNode>,
1599 neg: Box<HydroNode>,
1600 metadata: HydroIrMetadata,
1601 },
1602
1603 ResolveFutures {
1604 input: Box<HydroNode>,
1605 metadata: HydroIrMetadata,
1606 },
1607 ResolveFuturesOrdered {
1608 input: Box<HydroNode>,
1609 metadata: HydroIrMetadata,
1610 },
1611
1612 Map {
1613 f: DebugExpr,
1614 input: Box<HydroNode>,
1615 metadata: HydroIrMetadata,
1616 },
1617 FlatMap {
1618 f: DebugExpr,
1619 input: Box<HydroNode>,
1620 metadata: HydroIrMetadata,
1621 },
1622 Filter {
1623 f: DebugExpr,
1624 input: Box<HydroNode>,
1625 metadata: HydroIrMetadata,
1626 },
1627 FilterMap {
1628 f: DebugExpr,
1629 input: Box<HydroNode>,
1630 metadata: HydroIrMetadata,
1631 },
1632
1633 DeferTick {
1634 input: Box<HydroNode>,
1635 metadata: HydroIrMetadata,
1636 },
1637 Enumerate {
1638 input: Box<HydroNode>,
1639 metadata: HydroIrMetadata,
1640 },
1641 Inspect {
1642 f: DebugExpr,
1643 input: Box<HydroNode>,
1644 metadata: HydroIrMetadata,
1645 },
1646
1647 Unique {
1648 input: Box<HydroNode>,
1649 metadata: HydroIrMetadata,
1650 },
1651
1652 Sort {
1653 input: Box<HydroNode>,
1654 metadata: HydroIrMetadata,
1655 },
1656 Fold {
1657 init: DebugExpr,
1658 acc: DebugExpr,
1659 input: Box<HydroNode>,
1660 metadata: HydroIrMetadata,
1661 },
1662
1663 Scan {
1664 init: DebugExpr,
1665 acc: DebugExpr,
1666 input: Box<HydroNode>,
1667 metadata: HydroIrMetadata,
1668 },
1669 FoldKeyed {
1670 init: DebugExpr,
1671 acc: DebugExpr,
1672 input: Box<HydroNode>,
1673 metadata: HydroIrMetadata,
1674 },
1675
1676 Reduce {
1677 f: DebugExpr,
1678 input: Box<HydroNode>,
1679 metadata: HydroIrMetadata,
1680 },
1681 ReduceKeyed {
1682 f: DebugExpr,
1683 input: Box<HydroNode>,
1684 metadata: HydroIrMetadata,
1685 },
1686 ReduceKeyedWatermark {
1687 f: DebugExpr,
1688 input: Box<HydroNode>,
1689 watermark: Box<HydroNode>,
1690 metadata: HydroIrMetadata,
1691 },
1692
1693 Network {
1694 serialize_fn: Option<DebugExpr>,
1695 instantiate_fn: DebugInstantiate,
1696 deserialize_fn: Option<DebugExpr>,
1697 input: Box<HydroNode>,
1698 metadata: HydroIrMetadata,
1699 },
1700
1701 ExternalInput {
1702 from_external_id: usize,
1703 from_port_id: ExternalPortId,
1704 from_many: bool,
1705 codec_type: DebugType,
1706 port_hint: NetworkHint,
1707 instantiate_fn: DebugInstantiate,
1708 deserialize_fn: Option<DebugExpr>,
1709 metadata: HydroIrMetadata,
1710 },
1711
1712 Counter {
1713 tag: String,
1714 duration: DebugExpr,
1715 prefix: String,
1716 input: Box<HydroNode>,
1717 metadata: HydroIrMetadata,
1718 },
1719}
1720
1721pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1722pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1723
1724impl HydroNode {
1725 pub fn transform_bottom_up(
1726 &mut self,
1727 transform: &mut impl FnMut(&mut HydroNode),
1728 seen_tees: &mut SeenTees,
1729 check_well_formed: bool,
1730 ) {
1731 self.transform_children(
1732 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1733 seen_tees,
1734 );
1735
1736 transform(self);
1737
1738 let self_location = self.metadata().location_kind.root();
1739
1740 if check_well_formed {
1741 match &*self {
1742 HydroNode::Network { .. } => {}
1743 _ => {
1744 self.input_metadata().iter().for_each(|i| {
1745 if i.location_kind.root() != self_location {
1746 panic!(
1747 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1748 i,
1749 i.location_kind.root(),
1750 self,
1751 self_location
1752 )
1753 }
1754 });
1755 }
1756 }
1757 }
1758 }
1759
1760 #[inline(always)]
1761 pub fn transform_children(
1762 &mut self,
1763 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1764 seen_tees: &mut SeenTees,
1765 ) {
1766 match self {
1767 HydroNode::Placeholder => {
1768 panic!();
1769 }
1770
1771 HydroNode::Source { .. }
1772 | HydroNode::SingletonSource { .. }
1773 | HydroNode::CycleSource { .. }
1774 | HydroNode::ExternalInput { .. } => {}
1775
1776 HydroNode::Tee { inner, .. } => {
1777 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1778 *inner = TeeNode(transformed.clone());
1779 } else {
1780 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1781 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1782 let mut orig = inner.0.replace(HydroNode::Placeholder);
1783 transform(&mut orig, seen_tees);
1784 *transformed_cell.borrow_mut() = orig;
1785 *inner = TeeNode(transformed_cell);
1786 }
1787 }
1788
1789 HydroNode::Cast { inner, .. }
1790 | HydroNode::ObserveNonDet { inner, .. }
1791 | HydroNode::BeginAtomic { inner, .. }
1792 | HydroNode::EndAtomic { inner, .. }
1793 | HydroNode::Batch { inner, .. }
1794 | HydroNode::YieldConcat { inner, .. } => {
1795 transform(inner.as_mut(), seen_tees);
1796 }
1797
1798 HydroNode::Chain { first, second, .. } => {
1799 transform(first.as_mut(), seen_tees);
1800 transform(second.as_mut(), seen_tees);
1801 }
1802
1803 HydroNode::ChainFirst { first, second, .. } => {
1804 transform(first.as_mut(), seen_tees);
1805 transform(second.as_mut(), seen_tees);
1806 }
1807
1808 HydroNode::CrossSingleton { left, right, .. }
1809 | HydroNode::CrossProduct { left, right, .. }
1810 | HydroNode::Join { left, right, .. } => {
1811 transform(left.as_mut(), seen_tees);
1812 transform(right.as_mut(), seen_tees);
1813 }
1814
1815 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1816 transform(pos.as_mut(), seen_tees);
1817 transform(neg.as_mut(), seen_tees);
1818 }
1819
1820 HydroNode::ReduceKeyedWatermark {
1821 input, watermark, ..
1822 } => {
1823 transform(input.as_mut(), seen_tees);
1824 transform(watermark.as_mut(), seen_tees);
1825 }
1826
1827 HydroNode::Map { input, .. }
1828 | HydroNode::ResolveFutures { input, .. }
1829 | HydroNode::ResolveFuturesOrdered { input, .. }
1830 | HydroNode::FlatMap { input, .. }
1831 | HydroNode::Filter { input, .. }
1832 | HydroNode::FilterMap { input, .. }
1833 | HydroNode::Sort { input, .. }
1834 | HydroNode::DeferTick { input, .. }
1835 | HydroNode::Enumerate { input, .. }
1836 | HydroNode::Inspect { input, .. }
1837 | HydroNode::Unique { input, .. }
1838 | HydroNode::Network { input, .. }
1839 | HydroNode::Fold { input, .. }
1840 | HydroNode::Scan { input, .. }
1841 | HydroNode::FoldKeyed { input, .. }
1842 | HydroNode::Reduce { input, .. }
1843 | HydroNode::ReduceKeyed { input, .. }
1844 | HydroNode::Counter { input, .. } => {
1845 transform(input.as_mut(), seen_tees);
1846 }
1847 }
1848 }
1849
1850 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1851 match self {
1852 HydroNode::Placeholder => HydroNode::Placeholder,
1853 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1854 inner: Box::new(inner.deep_clone(seen_tees)),
1855 metadata: metadata.clone(),
1856 },
1857 HydroNode::ObserveNonDet {
1858 inner,
1859 trusted,
1860 metadata,
1861 } => HydroNode::ObserveNonDet {
1862 inner: Box::new(inner.deep_clone(seen_tees)),
1863 trusted: *trusted,
1864 metadata: metadata.clone(),
1865 },
1866 HydroNode::Source { source, metadata } => HydroNode::Source {
1867 source: source.clone(),
1868 metadata: metadata.clone(),
1869 },
1870 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1871 value: value.clone(),
1872 metadata: metadata.clone(),
1873 },
1874 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1875 ident: ident.clone(),
1876 metadata: metadata.clone(),
1877 },
1878 HydroNode::Tee { inner, metadata } => {
1879 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1880 HydroNode::Tee {
1881 inner: TeeNode(transformed.clone()),
1882 metadata: metadata.clone(),
1883 }
1884 } else {
1885 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1886 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1887 let cloned = inner.0.borrow().deep_clone(seen_tees);
1888 *new_rc.borrow_mut() = cloned;
1889 HydroNode::Tee {
1890 inner: TeeNode(new_rc),
1891 metadata: metadata.clone(),
1892 }
1893 }
1894 }
1895 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1896 inner: Box::new(inner.deep_clone(seen_tees)),
1897 metadata: metadata.clone(),
1898 },
1899 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1900 inner: Box::new(inner.deep_clone(seen_tees)),
1901 metadata: metadata.clone(),
1902 },
1903 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1904 inner: Box::new(inner.deep_clone(seen_tees)),
1905 metadata: metadata.clone(),
1906 },
1907 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1908 inner: Box::new(inner.deep_clone(seen_tees)),
1909 metadata: metadata.clone(),
1910 },
1911 HydroNode::Chain {
1912 first,
1913 second,
1914 metadata,
1915 } => HydroNode::Chain {
1916 first: Box::new(first.deep_clone(seen_tees)),
1917 second: Box::new(second.deep_clone(seen_tees)),
1918 metadata: metadata.clone(),
1919 },
1920 HydroNode::ChainFirst {
1921 first,
1922 second,
1923 metadata,
1924 } => HydroNode::ChainFirst {
1925 first: Box::new(first.deep_clone(seen_tees)),
1926 second: Box::new(second.deep_clone(seen_tees)),
1927 metadata: metadata.clone(),
1928 },
1929 HydroNode::CrossProduct {
1930 left,
1931 right,
1932 metadata,
1933 } => HydroNode::CrossProduct {
1934 left: Box::new(left.deep_clone(seen_tees)),
1935 right: Box::new(right.deep_clone(seen_tees)),
1936 metadata: metadata.clone(),
1937 },
1938 HydroNode::CrossSingleton {
1939 left,
1940 right,
1941 metadata,
1942 } => HydroNode::CrossSingleton {
1943 left: Box::new(left.deep_clone(seen_tees)),
1944 right: Box::new(right.deep_clone(seen_tees)),
1945 metadata: metadata.clone(),
1946 },
1947 HydroNode::Join {
1948 left,
1949 right,
1950 metadata,
1951 } => HydroNode::Join {
1952 left: Box::new(left.deep_clone(seen_tees)),
1953 right: Box::new(right.deep_clone(seen_tees)),
1954 metadata: metadata.clone(),
1955 },
1956 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1957 pos: Box::new(pos.deep_clone(seen_tees)),
1958 neg: Box::new(neg.deep_clone(seen_tees)),
1959 metadata: metadata.clone(),
1960 },
1961 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1962 pos: Box::new(pos.deep_clone(seen_tees)),
1963 neg: Box::new(neg.deep_clone(seen_tees)),
1964 metadata: metadata.clone(),
1965 },
1966 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1967 input: Box::new(input.deep_clone(seen_tees)),
1968 metadata: metadata.clone(),
1969 },
1970 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1971 HydroNode::ResolveFuturesOrdered {
1972 input: Box::new(input.deep_clone(seen_tees)),
1973 metadata: metadata.clone(),
1974 }
1975 }
1976 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1977 f: f.clone(),
1978 input: Box::new(input.deep_clone(seen_tees)),
1979 metadata: metadata.clone(),
1980 },
1981 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1982 f: f.clone(),
1983 input: Box::new(input.deep_clone(seen_tees)),
1984 metadata: metadata.clone(),
1985 },
1986 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1987 f: f.clone(),
1988 input: Box::new(input.deep_clone(seen_tees)),
1989 metadata: metadata.clone(),
1990 },
1991 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1992 f: f.clone(),
1993 input: Box::new(input.deep_clone(seen_tees)),
1994 metadata: metadata.clone(),
1995 },
1996 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1997 input: Box::new(input.deep_clone(seen_tees)),
1998 metadata: metadata.clone(),
1999 },
2000 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2001 input: Box::new(input.deep_clone(seen_tees)),
2002 metadata: metadata.clone(),
2003 },
2004 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2005 f: f.clone(),
2006 input: Box::new(input.deep_clone(seen_tees)),
2007 metadata: metadata.clone(),
2008 },
2009 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2010 input: Box::new(input.deep_clone(seen_tees)),
2011 metadata: metadata.clone(),
2012 },
2013 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2014 input: Box::new(input.deep_clone(seen_tees)),
2015 metadata: metadata.clone(),
2016 },
2017 HydroNode::Fold {
2018 init,
2019 acc,
2020 input,
2021 metadata,
2022 } => HydroNode::Fold {
2023 init: init.clone(),
2024 acc: acc.clone(),
2025 input: Box::new(input.deep_clone(seen_tees)),
2026 metadata: metadata.clone(),
2027 },
2028 HydroNode::Scan {
2029 init,
2030 acc,
2031 input,
2032 metadata,
2033 } => HydroNode::Scan {
2034 init: init.clone(),
2035 acc: acc.clone(),
2036 input: Box::new(input.deep_clone(seen_tees)),
2037 metadata: metadata.clone(),
2038 },
2039 HydroNode::FoldKeyed {
2040 init,
2041 acc,
2042 input,
2043 metadata,
2044 } => HydroNode::FoldKeyed {
2045 init: init.clone(),
2046 acc: acc.clone(),
2047 input: Box::new(input.deep_clone(seen_tees)),
2048 metadata: metadata.clone(),
2049 },
2050 HydroNode::ReduceKeyedWatermark {
2051 f,
2052 input,
2053 watermark,
2054 metadata,
2055 } => HydroNode::ReduceKeyedWatermark {
2056 f: f.clone(),
2057 input: Box::new(input.deep_clone(seen_tees)),
2058 watermark: Box::new(watermark.deep_clone(seen_tees)),
2059 metadata: metadata.clone(),
2060 },
2061 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2062 f: f.clone(),
2063 input: Box::new(input.deep_clone(seen_tees)),
2064 metadata: metadata.clone(),
2065 },
2066 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2067 f: f.clone(),
2068 input: Box::new(input.deep_clone(seen_tees)),
2069 metadata: metadata.clone(),
2070 },
2071 HydroNode::Network {
2072 serialize_fn,
2073 instantiate_fn,
2074 deserialize_fn,
2075 input,
2076 metadata,
2077 } => HydroNode::Network {
2078 serialize_fn: serialize_fn.clone(),
2079 instantiate_fn: instantiate_fn.clone(),
2080 deserialize_fn: deserialize_fn.clone(),
2081 input: Box::new(input.deep_clone(seen_tees)),
2082 metadata: metadata.clone(),
2083 },
2084 HydroNode::ExternalInput {
2085 from_external_id,
2086 from_port_id,
2087 from_many,
2088 codec_type,
2089 port_hint,
2090 instantiate_fn,
2091 deserialize_fn,
2092 metadata,
2093 } => HydroNode::ExternalInput {
2094 from_external_id: *from_external_id,
2095 from_port_id: *from_port_id,
2096 from_many: *from_many,
2097 codec_type: codec_type.clone(),
2098 port_hint: *port_hint,
2099 instantiate_fn: instantiate_fn.clone(),
2100 deserialize_fn: deserialize_fn.clone(),
2101 metadata: metadata.clone(),
2102 },
2103 HydroNode::Counter {
2104 tag,
2105 duration,
2106 prefix,
2107 input,
2108 metadata,
2109 } => HydroNode::Counter {
2110 tag: tag.clone(),
2111 duration: duration.clone(),
2112 prefix: prefix.clone(),
2113 input: Box::new(input.deep_clone(seen_tees)),
2114 metadata: metadata.clone(),
2115 },
2116 }
2117 }
2118
2119 #[cfg(feature = "build")]
2120 pub fn emit_core<'a, D: Deploy<'a>>(
2121 &mut self,
2122 builders_or_callback: &mut BuildersOrCallback<
2123 impl FnMut(&mut HydroRoot, &mut usize),
2124 impl FnMut(&mut HydroNode, &mut usize),
2125 >,
2126 seen_tees: &mut SeenTees,
2127 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2128 next_stmt_id: &mut usize,
2129 ) -> syn::Ident {
2130 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2131
2132 self.transform_bottom_up(
2133 &mut |node: &mut HydroNode| {
2134 let out_location = node.metadata().location_kind.clone();
2135 match node {
2136 HydroNode::Placeholder => {
2137 panic!()
2138 }
2139
2140 HydroNode::Cast { .. } => {
2141 match builders_or_callback {
2144 BuildersOrCallback::Builders(_) => {}
2145 BuildersOrCallback::Callback(_, node_callback) => {
2146 node_callback(node, next_stmt_id);
2147 }
2148 }
2149
2150 *next_stmt_id += 1;
2151 }
2153
2154 HydroNode::ObserveNonDet {
2155 inner,
2156 trusted,
2157 metadata,
2158 ..
2159 } => {
2160 let inner_ident = ident_stack.pop().unwrap();
2161
2162 let observe_ident =
2163 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2164
2165 match builders_or_callback {
2166 BuildersOrCallback::Builders(graph_builders) => {
2167 graph_builders.observe_nondet(
2168 *trusted,
2169 &inner.metadata().location_kind,
2170 inner_ident,
2171 &inner.metadata().collection_kind,
2172 &observe_ident,
2173 &metadata.collection_kind,
2174 &metadata.op,
2175 );
2176 }
2177 BuildersOrCallback::Callback(_, node_callback) => {
2178 node_callback(node, next_stmt_id);
2179 }
2180 }
2181
2182 *next_stmt_id += 1;
2183
2184 ident_stack.push(observe_ident);
2185 }
2186
2187 HydroNode::Batch {
2188 inner, metadata, ..
2189 } => {
2190 let inner_ident = ident_stack.pop().unwrap();
2191
2192 let batch_ident =
2193 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2194
2195 match builders_or_callback {
2196 BuildersOrCallback::Builders(graph_builders) => {
2197 graph_builders.batch(
2198 inner_ident,
2199 &inner.metadata().location_kind,
2200 &inner.metadata().collection_kind,
2201 &batch_ident,
2202 &out_location,
2203 &metadata.op,
2204 );
2205 }
2206 BuildersOrCallback::Callback(_, node_callback) => {
2207 node_callback(node, next_stmt_id);
2208 }
2209 }
2210
2211 *next_stmt_id += 1;
2212
2213 ident_stack.push(batch_ident);
2214 }
2215
2216 HydroNode::YieldConcat { inner, .. } => {
2217 let inner_ident = ident_stack.pop().unwrap();
2218
2219 let yield_ident =
2220 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2221
2222 match builders_or_callback {
2223 BuildersOrCallback::Builders(graph_builders) => {
2224 graph_builders.yield_from_tick(
2225 inner_ident,
2226 &inner.metadata().location_kind,
2227 &inner.metadata().collection_kind,
2228 &yield_ident,
2229 &out_location,
2230 );
2231 }
2232 BuildersOrCallback::Callback(_, node_callback) => {
2233 node_callback(node, next_stmt_id);
2234 }
2235 }
2236
2237 *next_stmt_id += 1;
2238
2239 ident_stack.push(yield_ident);
2240 }
2241
2242 HydroNode::BeginAtomic { inner, metadata } => {
2243 let inner_ident = ident_stack.pop().unwrap();
2244
2245 let begin_ident =
2246 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2247
2248 match builders_or_callback {
2249 BuildersOrCallback::Builders(graph_builders) => {
2250 graph_builders.begin_atomic(
2251 inner_ident,
2252 &inner.metadata().location_kind,
2253 &inner.metadata().collection_kind,
2254 &begin_ident,
2255 &out_location,
2256 &metadata.op,
2257 );
2258 }
2259 BuildersOrCallback::Callback(_, node_callback) => {
2260 node_callback(node, next_stmt_id);
2261 }
2262 }
2263
2264 *next_stmt_id += 1;
2265
2266 ident_stack.push(begin_ident);
2267 }
2268
2269 HydroNode::EndAtomic { inner, .. } => {
2270 let inner_ident = ident_stack.pop().unwrap();
2271
2272 let end_ident =
2273 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2274
2275 match builders_or_callback {
2276 BuildersOrCallback::Builders(graph_builders) => {
2277 graph_builders.end_atomic(
2278 inner_ident,
2279 &inner.metadata().location_kind,
2280 &inner.metadata().collection_kind,
2281 &end_ident,
2282 );
2283 }
2284 BuildersOrCallback::Callback(_, node_callback) => {
2285 node_callback(node, next_stmt_id);
2286 }
2287 }
2288
2289 *next_stmt_id += 1;
2290
2291 ident_stack.push(end_ident);
2292 }
2293
2294 HydroNode::Source {
2295 source, metadata, ..
2296 } => {
2297 if let HydroSource::ExternalNetwork() = source {
2298 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2299 } else {
2300 let source_ident =
2301 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2302
2303 let source_stmt = match source {
2304 HydroSource::Stream(expr) => {
2305 debug_assert!(metadata.location_kind.is_top_level());
2306 parse_quote! {
2307 #source_ident = source_stream(#expr);
2308 }
2309 }
2310
2311 HydroSource::ExternalNetwork() => {
2312 unreachable!()
2313 }
2314
2315 HydroSource::Iter(expr) => {
2316 if metadata.location_kind.is_top_level() {
2317 parse_quote! {
2318 #source_ident = source_iter(#expr);
2319 }
2320 } else {
2321 parse_quote! {
2323 #source_ident = source_iter(#expr) -> persist::<'static>();
2324 }
2325 }
2326 }
2327
2328 HydroSource::Spin() => {
2329 debug_assert!(metadata.location_kind.is_top_level());
2330 parse_quote! {
2331 #source_ident = spin();
2332 }
2333 }
2334
2335 HydroSource::ClusterMembers(location_id) => {
2336 debug_assert!(metadata.location_kind.is_top_level());
2337
2338 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2339 D::cluster_membership_stream(location_id),
2340 &(),
2341 );
2342
2343 parse_quote! {
2344 #source_ident = source_stream(#expr);
2345 }
2346 }
2347 };
2348
2349 match builders_or_callback {
2350 BuildersOrCallback::Builders(graph_builders) => {
2351 let builder = graph_builders.get_dfir_mut(&out_location);
2352 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2353 }
2354 BuildersOrCallback::Callback(_, node_callback) => {
2355 node_callback(node, next_stmt_id);
2356 }
2357 }
2358
2359 *next_stmt_id += 1;
2360
2361 ident_stack.push(source_ident);
2362 }
2363 }
2364
2365 HydroNode::SingletonSource { value, metadata } => {
2366 let source_ident =
2367 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2368
2369 match builders_or_callback {
2370 BuildersOrCallback::Builders(graph_builders) => {
2371 let builder = graph_builders.get_dfir_mut(&out_location);
2372
2373 if metadata.location_kind.is_top_level()
2374 && metadata.collection_kind.is_bounded()
2375 {
2376 builder.add_dfir(
2377 parse_quote! {
2378 #source_ident = source_iter([#value]);
2379 },
2380 None,
2381 Some(&next_stmt_id.to_string()),
2382 );
2383 } else {
2384 builder.add_dfir(
2385 parse_quote! {
2386 #source_ident = source_iter([#value]) -> persist::<'static>();
2387 },
2388 None,
2389 Some(&next_stmt_id.to_string()),
2390 );
2391 }
2392 }
2393 BuildersOrCallback::Callback(_, node_callback) => {
2394 node_callback(node, next_stmt_id);
2395 }
2396 }
2397
2398 *next_stmt_id += 1;
2399
2400 ident_stack.push(source_ident);
2401 }
2402
2403 HydroNode::CycleSource { ident, .. } => {
2404 let ident = ident.clone();
2405
2406 match builders_or_callback {
2407 BuildersOrCallback::Builders(_) => {}
2408 BuildersOrCallback::Callback(_, node_callback) => {
2409 node_callback(node, next_stmt_id);
2410 }
2411 }
2412
2413 *next_stmt_id += 1;
2415
2416 ident_stack.push(ident);
2417 }
2418
2419 HydroNode::Tee { inner, .. } => {
2420 let ret_ident = if let Some(teed_from) =
2421 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2422 {
2423 match builders_or_callback {
2424 BuildersOrCallback::Builders(_) => {}
2425 BuildersOrCallback::Callback(_, node_callback) => {
2426 node_callback(node, next_stmt_id);
2427 }
2428 }
2429
2430 teed_from.clone()
2431 } else {
2432 let inner_ident = ident_stack.pop().unwrap();
2435
2436 let tee_ident =
2437 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2438
2439 built_tees.insert(
2440 inner.0.as_ref() as *const RefCell<HydroNode>,
2441 tee_ident.clone(),
2442 );
2443
2444 match builders_or_callback {
2445 BuildersOrCallback::Builders(graph_builders) => {
2446 let builder = graph_builders.get_dfir_mut(&out_location);
2447 builder.add_dfir(
2448 parse_quote! {
2449 #tee_ident = #inner_ident -> tee();
2450 },
2451 None,
2452 Some(&next_stmt_id.to_string()),
2453 );
2454 }
2455 BuildersOrCallback::Callback(_, node_callback) => {
2456 node_callback(node, next_stmt_id);
2457 }
2458 }
2459
2460 tee_ident
2461 };
2462
2463 *next_stmt_id += 1;
2467 ident_stack.push(ret_ident);
2468 }
2469
2470 HydroNode::Chain { .. } => {
2471 let second_ident = ident_stack.pop().unwrap();
2473 let first_ident = ident_stack.pop().unwrap();
2474
2475 let chain_ident =
2476 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2477
2478 match builders_or_callback {
2479 BuildersOrCallback::Builders(graph_builders) => {
2480 let builder = graph_builders.get_dfir_mut(&out_location);
2481 builder.add_dfir(
2482 parse_quote! {
2483 #chain_ident = chain();
2484 #first_ident -> [0]#chain_ident;
2485 #second_ident -> [1]#chain_ident;
2486 },
2487 None,
2488 Some(&next_stmt_id.to_string()),
2489 );
2490 }
2491 BuildersOrCallback::Callback(_, node_callback) => {
2492 node_callback(node, next_stmt_id);
2493 }
2494 }
2495
2496 *next_stmt_id += 1;
2497
2498 ident_stack.push(chain_ident);
2499 }
2500
2501 HydroNode::ChainFirst { .. } => {
2502 let second_ident = ident_stack.pop().unwrap();
2503 let first_ident = ident_stack.pop().unwrap();
2504
2505 let chain_ident =
2506 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2507
2508 match builders_or_callback {
2509 BuildersOrCallback::Builders(graph_builders) => {
2510 let builder = graph_builders.get_dfir_mut(&out_location);
2511 builder.add_dfir(
2512 parse_quote! {
2513 #chain_ident = chain_first_n(1);
2514 #first_ident -> [0]#chain_ident;
2515 #second_ident -> [1]#chain_ident;
2516 },
2517 None,
2518 Some(&next_stmt_id.to_string()),
2519 );
2520 }
2521 BuildersOrCallback::Callback(_, node_callback) => {
2522 node_callback(node, next_stmt_id);
2523 }
2524 }
2525
2526 *next_stmt_id += 1;
2527
2528 ident_stack.push(chain_ident);
2529 }
2530
2531 HydroNode::CrossSingleton { right, .. } => {
2532 let right_ident = ident_stack.pop().unwrap();
2533 let left_ident = ident_stack.pop().unwrap();
2534
2535 let cross_ident =
2536 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2537
2538 match builders_or_callback {
2539 BuildersOrCallback::Builders(graph_builders) => {
2540 let builder = graph_builders.get_dfir_mut(&out_location);
2541
2542 if right.metadata().location_kind.is_top_level()
2543 && right.metadata().collection_kind.is_bounded()
2544 {
2545 builder.add_dfir(
2546 parse_quote! {
2547 #cross_ident = cross_singleton();
2548 #left_ident -> [input]#cross_ident;
2549 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2550 },
2551 None,
2552 Some(&next_stmt_id.to_string()),
2553 );
2554 } else {
2555 builder.add_dfir(
2556 parse_quote! {
2557 #cross_ident = cross_singleton();
2558 #left_ident -> [input]#cross_ident;
2559 #right_ident -> [single]#cross_ident;
2560 },
2561 None,
2562 Some(&next_stmt_id.to_string()),
2563 );
2564 }
2565 }
2566 BuildersOrCallback::Callback(_, node_callback) => {
2567 node_callback(node, next_stmt_id);
2568 }
2569 }
2570
2571 *next_stmt_id += 1;
2572
2573 ident_stack.push(cross_ident);
2574 }
2575
2576 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2577 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2578 parse_quote!(cross_join_multiset)
2579 } else {
2580 parse_quote!(join_multiset)
2581 };
2582
2583 let (HydroNode::CrossProduct { left, right, .. }
2584 | HydroNode::Join { left, right, .. }) = node
2585 else {
2586 unreachable!()
2587 };
2588
2589 let is_top_level = left.metadata().location_kind.is_top_level()
2590 && right.metadata().location_kind.is_top_level();
2591 let left_lifetime = if left.metadata().location_kind.is_top_level() {
2592 quote!('static)
2593 } else {
2594 quote!('tick)
2595 };
2596
2597 let right_lifetime = if right.metadata().location_kind.is_top_level() {
2598 quote!('static)
2599 } else {
2600 quote!('tick)
2601 };
2602
2603 let right_ident = ident_stack.pop().unwrap();
2604 let left_ident = ident_stack.pop().unwrap();
2605
2606 let stream_ident =
2607 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2608
2609 match builders_or_callback {
2610 BuildersOrCallback::Builders(graph_builders) => {
2611 let builder = graph_builders.get_dfir_mut(&out_location);
2612 builder.add_dfir(
2613 if is_top_level {
2614 parse_quote! {
2617 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2618 #left_ident -> [0]#stream_ident;
2619 #right_ident -> [1]#stream_ident;
2620 }
2621 } else {
2622 parse_quote! {
2623 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2624 #left_ident -> [0]#stream_ident;
2625 #right_ident -> [1]#stream_ident;
2626 }
2627 }
2628 ,
2629 None,
2630 Some(&next_stmt_id.to_string()),
2631 );
2632 }
2633 BuildersOrCallback::Callback(_, node_callback) => {
2634 node_callback(node, next_stmt_id);
2635 }
2636 }
2637
2638 *next_stmt_id += 1;
2639
2640 ident_stack.push(stream_ident);
2641 }
2642
2643 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2644 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2645 parse_quote!(difference)
2646 } else {
2647 parse_quote!(anti_join)
2648 };
2649
2650 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2651 node
2652 else {
2653 unreachable!()
2654 };
2655
2656 let neg_lifetime = if neg.metadata().location_kind.is_top_level() {
2657 quote!('static)
2658 } else {
2659 quote!('tick)
2660 };
2661
2662 let neg_ident = ident_stack.pop().unwrap();
2663 let pos_ident = ident_stack.pop().unwrap();
2664
2665 let stream_ident =
2666 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2667
2668 match builders_or_callback {
2669 BuildersOrCallback::Builders(graph_builders) => {
2670 let builder = graph_builders.get_dfir_mut(&out_location);
2671 builder.add_dfir(
2672 parse_quote! {
2673 #stream_ident = #operator::<'tick, #neg_lifetime>();
2674 #pos_ident -> [pos]#stream_ident;
2675 #neg_ident -> [neg]#stream_ident;
2676 },
2677 None,
2678 Some(&next_stmt_id.to_string()),
2679 );
2680 }
2681 BuildersOrCallback::Callback(_, node_callback) => {
2682 node_callback(node, next_stmt_id);
2683 }
2684 }
2685
2686 *next_stmt_id += 1;
2687
2688 ident_stack.push(stream_ident);
2689 }
2690
2691 HydroNode::ResolveFutures { .. } => {
2692 let input_ident = ident_stack.pop().unwrap();
2693
2694 let futures_ident =
2695 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2696
2697 match builders_or_callback {
2698 BuildersOrCallback::Builders(graph_builders) => {
2699 let builder = graph_builders.get_dfir_mut(&out_location);
2700 builder.add_dfir(
2701 parse_quote! {
2702 #futures_ident = #input_ident -> resolve_futures();
2703 },
2704 None,
2705 Some(&next_stmt_id.to_string()),
2706 );
2707 }
2708 BuildersOrCallback::Callback(_, node_callback) => {
2709 node_callback(node, next_stmt_id);
2710 }
2711 }
2712
2713 *next_stmt_id += 1;
2714
2715 ident_stack.push(futures_ident);
2716 }
2717
2718 HydroNode::ResolveFuturesOrdered { .. } => {
2719 let input_ident = ident_stack.pop().unwrap();
2720
2721 let futures_ident =
2722 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2723
2724 match builders_or_callback {
2725 BuildersOrCallback::Builders(graph_builders) => {
2726 let builder = graph_builders.get_dfir_mut(&out_location);
2727 builder.add_dfir(
2728 parse_quote! {
2729 #futures_ident = #input_ident -> resolve_futures_ordered();
2730 },
2731 None,
2732 Some(&next_stmt_id.to_string()),
2733 );
2734 }
2735 BuildersOrCallback::Callback(_, node_callback) => {
2736 node_callback(node, next_stmt_id);
2737 }
2738 }
2739
2740 *next_stmt_id += 1;
2741
2742 ident_stack.push(futures_ident);
2743 }
2744
2745 HydroNode::Map { f, .. } => {
2746 let input_ident = ident_stack.pop().unwrap();
2747
2748 let map_ident =
2749 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2750
2751 match builders_or_callback {
2752 BuildersOrCallback::Builders(graph_builders) => {
2753 let builder = graph_builders.get_dfir_mut(&out_location);
2754 builder.add_dfir(
2755 parse_quote! {
2756 #map_ident = #input_ident -> map(#f);
2757 },
2758 None,
2759 Some(&next_stmt_id.to_string()),
2760 );
2761 }
2762 BuildersOrCallback::Callback(_, node_callback) => {
2763 node_callback(node, next_stmt_id);
2764 }
2765 }
2766
2767 *next_stmt_id += 1;
2768
2769 ident_stack.push(map_ident);
2770 }
2771
2772 HydroNode::FlatMap { f, .. } => {
2773 let input_ident = ident_stack.pop().unwrap();
2774
2775 let flat_map_ident =
2776 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2777
2778 match builders_or_callback {
2779 BuildersOrCallback::Builders(graph_builders) => {
2780 let builder = graph_builders.get_dfir_mut(&out_location);
2781 builder.add_dfir(
2782 parse_quote! {
2783 #flat_map_ident = #input_ident -> flat_map(#f);
2784 },
2785 None,
2786 Some(&next_stmt_id.to_string()),
2787 );
2788 }
2789 BuildersOrCallback::Callback(_, node_callback) => {
2790 node_callback(node, next_stmt_id);
2791 }
2792 }
2793
2794 *next_stmt_id += 1;
2795
2796 ident_stack.push(flat_map_ident);
2797 }
2798
2799 HydroNode::Filter { f, .. } => {
2800 let input_ident = ident_stack.pop().unwrap();
2801
2802 let filter_ident =
2803 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2804
2805 match builders_or_callback {
2806 BuildersOrCallback::Builders(graph_builders) => {
2807 let builder = graph_builders.get_dfir_mut(&out_location);
2808 builder.add_dfir(
2809 parse_quote! {
2810 #filter_ident = #input_ident -> filter(#f);
2811 },
2812 None,
2813 Some(&next_stmt_id.to_string()),
2814 );
2815 }
2816 BuildersOrCallback::Callback(_, node_callback) => {
2817 node_callback(node, next_stmt_id);
2818 }
2819 }
2820
2821 *next_stmt_id += 1;
2822
2823 ident_stack.push(filter_ident);
2824 }
2825
2826 HydroNode::FilterMap { f, .. } => {
2827 let input_ident = ident_stack.pop().unwrap();
2828
2829 let filter_map_ident =
2830 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2831
2832 match builders_or_callback {
2833 BuildersOrCallback::Builders(graph_builders) => {
2834 let builder = graph_builders.get_dfir_mut(&out_location);
2835 builder.add_dfir(
2836 parse_quote! {
2837 #filter_map_ident = #input_ident -> filter_map(#f);
2838 },
2839 None,
2840 Some(&next_stmt_id.to_string()),
2841 );
2842 }
2843 BuildersOrCallback::Callback(_, node_callback) => {
2844 node_callback(node, next_stmt_id);
2845 }
2846 }
2847
2848 *next_stmt_id += 1;
2849
2850 ident_stack.push(filter_map_ident);
2851 }
2852
2853 HydroNode::Sort { .. } => {
2854 let input_ident = ident_stack.pop().unwrap();
2855
2856 let sort_ident =
2857 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2858
2859 match builders_or_callback {
2860 BuildersOrCallback::Builders(graph_builders) => {
2861 let builder = graph_builders.get_dfir_mut(&out_location);
2862 builder.add_dfir(
2863 parse_quote! {
2864 #sort_ident = #input_ident -> sort();
2865 },
2866 None,
2867 Some(&next_stmt_id.to_string()),
2868 );
2869 }
2870 BuildersOrCallback::Callback(_, node_callback) => {
2871 node_callback(node, next_stmt_id);
2872 }
2873 }
2874
2875 *next_stmt_id += 1;
2876
2877 ident_stack.push(sort_ident);
2878 }
2879
2880 HydroNode::DeferTick { .. } => {
2881 let input_ident = ident_stack.pop().unwrap();
2882
2883 let defer_tick_ident =
2884 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2885
2886 match builders_or_callback {
2887 BuildersOrCallback::Builders(graph_builders) => {
2888 let builder = graph_builders.get_dfir_mut(&out_location);
2889 builder.add_dfir(
2890 parse_quote! {
2891 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2892 },
2893 None,
2894 Some(&next_stmt_id.to_string()),
2895 );
2896 }
2897 BuildersOrCallback::Callback(_, node_callback) => {
2898 node_callback(node, next_stmt_id);
2899 }
2900 }
2901
2902 *next_stmt_id += 1;
2903
2904 ident_stack.push(defer_tick_ident);
2905 }
2906
2907 HydroNode::Enumerate { input, .. } => {
2908 let input_ident = ident_stack.pop().unwrap();
2909
2910 let enumerate_ident =
2911 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2912
2913 match builders_or_callback {
2914 BuildersOrCallback::Builders(graph_builders) => {
2915 let builder = graph_builders.get_dfir_mut(&out_location);
2916 let lifetime = if input.metadata().location_kind.is_top_level() {
2917 quote!('static)
2918 } else {
2919 quote!('tick)
2920 };
2921 builder.add_dfir(
2922 parse_quote! {
2923 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2924 },
2925 None,
2926 Some(&next_stmt_id.to_string()),
2927 );
2928 }
2929 BuildersOrCallback::Callback(_, node_callback) => {
2930 node_callback(node, next_stmt_id);
2931 }
2932 }
2933
2934 *next_stmt_id += 1;
2935
2936 ident_stack.push(enumerate_ident);
2937 }
2938
2939 HydroNode::Inspect { f, .. } => {
2940 let input_ident = ident_stack.pop().unwrap();
2941
2942 let inspect_ident =
2943 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2944
2945 match builders_or_callback {
2946 BuildersOrCallback::Builders(graph_builders) => {
2947 let builder = graph_builders.get_dfir_mut(&out_location);
2948 builder.add_dfir(
2949 parse_quote! {
2950 #inspect_ident = #input_ident -> inspect(#f);
2951 },
2952 None,
2953 Some(&next_stmt_id.to_string()),
2954 );
2955 }
2956 BuildersOrCallback::Callback(_, node_callback) => {
2957 node_callback(node, next_stmt_id);
2958 }
2959 }
2960
2961 *next_stmt_id += 1;
2962
2963 ident_stack.push(inspect_ident);
2964 }
2965
2966 HydroNode::Unique { input, .. } => {
2967 let input_ident = ident_stack.pop().unwrap();
2968
2969 let unique_ident =
2970 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2971
2972 match builders_or_callback {
2973 BuildersOrCallback::Builders(graph_builders) => {
2974 let builder = graph_builders.get_dfir_mut(&out_location);
2975 let lifetime = if input.metadata().location_kind.is_top_level() {
2976 quote!('static)
2977 } else {
2978 quote!('tick)
2979 };
2980
2981 builder.add_dfir(
2982 parse_quote! {
2983 #unique_ident = #input_ident -> unique::<#lifetime>();
2984 },
2985 None,
2986 Some(&next_stmt_id.to_string()),
2987 );
2988 }
2989 BuildersOrCallback::Callback(_, node_callback) => {
2990 node_callback(node, next_stmt_id);
2991 }
2992 }
2993
2994 *next_stmt_id += 1;
2995
2996 ident_stack.push(unique_ident);
2997 }
2998
2999 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3000 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3001 if input.metadata().location_kind.is_top_level()
3002 && input.metadata().collection_kind.is_bounded()
3003 {
3004 parse_quote!(fold_no_replay)
3005 } else {
3006 parse_quote!(fold)
3007 }
3008 } else if matches!(node, HydroNode::Scan { .. }) {
3009 parse_quote!(scan)
3010 } else if let HydroNode::FoldKeyed { input, .. } = node {
3011 if input.metadata().location_kind.is_top_level()
3012 && input.metadata().collection_kind.is_bounded()
3013 {
3014 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3015 } else {
3016 parse_quote!(fold_keyed)
3017 }
3018 } else {
3019 unreachable!()
3020 };
3021
3022 let (HydroNode::Fold { input, .. }
3023 | HydroNode::FoldKeyed { input, .. }
3024 | HydroNode::Scan { input, .. }) = node
3025 else {
3026 unreachable!()
3027 };
3028
3029 let lifetime = if input.metadata().location_kind.is_top_level() {
3030 quote!('static)
3031 } else {
3032 quote!('tick)
3033 };
3034
3035 let input_ident = ident_stack.pop().unwrap();
3036
3037 let (HydroNode::Fold { init, acc, .. }
3038 | HydroNode::FoldKeyed { init, acc, .. }
3039 | HydroNode::Scan { init, acc, .. }) = &*node
3040 else {
3041 unreachable!()
3042 };
3043
3044 let fold_ident =
3045 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3046
3047 match builders_or_callback {
3048 BuildersOrCallback::Builders(graph_builders) => {
3049 if matches!(node, HydroNode::Fold { .. })
3050 && node.metadata().location_kind.is_top_level()
3051 && !(matches!(node.metadata().location_kind, LocationId::Atomic(_)))
3052 && graph_builders.singleton_intermediates()
3053 && !node.metadata().collection_kind.is_bounded()
3054 {
3055 let builder = graph_builders.get_dfir_mut(&out_location);
3056
3057 let acc: syn::Expr = parse_quote!({
3058 let mut __inner = #acc;
3059 move |__state, __value| {
3060 __inner(__state, __value);
3061 Some(__state.clone())
3062 }
3063 });
3064
3065 builder.add_dfir(
3066 parse_quote! {
3067 source_iter([(#init)()]) -> [0]#fold_ident;
3068 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3069 #fold_ident = chain();
3070 },
3071 None,
3072 Some(&next_stmt_id.to_string()),
3073 );
3074 } else if matches!(node, HydroNode::FoldKeyed { .. })
3075 && node.metadata().location_kind.is_top_level()
3076 && !(matches!(node.metadata().location_kind, LocationId::Atomic(_)))
3077 && graph_builders.singleton_intermediates()
3078 && !node.metadata().collection_kind.is_bounded()
3079 {
3080 let builder = graph_builders.get_dfir_mut(&out_location);
3081
3082 let acc: syn::Expr = parse_quote!({
3083 let mut __init = #init;
3084 let mut __inner = #acc;
3085 move |__state, __kv: (_, _)| {
3086 let __state = __state
3088 .entry(::std::clone::Clone::clone(&__kv.0))
3089 .or_insert_with(|| (__init)());
3090 __inner(__state, __kv.1);
3091 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3092 }
3093 });
3094
3095 builder.add_dfir(
3096 parse_quote! {
3097 source_iter([(#init)()]) -> [0]#fold_ident;
3098 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3099 },
3100 None,
3101 Some(&next_stmt_id.to_string()),
3102 );
3103 } else {
3104 let builder = graph_builders.get_dfir_mut(&out_location);
3105 builder.add_dfir(
3106 parse_quote! {
3107 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3108 },
3109 None,
3110 Some(&next_stmt_id.to_string()),
3111 );
3112 }
3113 }
3114 BuildersOrCallback::Callback(_, node_callback) => {
3115 node_callback(node, next_stmt_id);
3116 }
3117 }
3118
3119 *next_stmt_id += 1;
3120
3121 ident_stack.push(fold_ident);
3122 }
3123
3124 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3125 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3126 if input.metadata().location_kind.is_top_level()
3127 && input.metadata().collection_kind.is_bounded()
3128 {
3129 parse_quote!(reduce_no_replay)
3130 } else {
3131 parse_quote!(reduce)
3132 }
3133 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3134 if input.metadata().location_kind.is_top_level()
3135 && input.metadata().collection_kind.is_bounded()
3136 {
3137 todo!(
3138 "Calling keyed reduce on a top-level bounded collection is not supported"
3139 )
3140 } else {
3141 parse_quote!(reduce_keyed)
3142 }
3143 } else {
3144 unreachable!()
3145 };
3146
3147 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3148 else {
3149 unreachable!()
3150 };
3151
3152 let lifetime = if input.metadata().location_kind.is_top_level() {
3153 quote!('static)
3154 } else {
3155 quote!('tick)
3156 };
3157
3158 let input_ident = ident_stack.pop().unwrap();
3159
3160 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3161 else {
3162 unreachable!()
3163 };
3164
3165 let reduce_ident =
3166 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3167
3168 match builders_or_callback {
3169 BuildersOrCallback::Builders(graph_builders) => {
3170 if matches!(node, HydroNode::Reduce { .. })
3171 && node.metadata().location_kind.is_top_level()
3172 && !(matches!(node.metadata().location_kind, LocationId::Atomic(_)))
3173 && graph_builders.singleton_intermediates()
3174 && !node.metadata().collection_kind.is_bounded()
3175 {
3176 todo!(
3177 "Reduce with optional intermediates is not yet supported in simulator"
3178 );
3179 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3180 && node.metadata().location_kind.is_top_level()
3181 && !(matches!(node.metadata().location_kind, LocationId::Atomic(_)))
3182 && graph_builders.singleton_intermediates()
3183 && !node.metadata().collection_kind.is_bounded()
3184 {
3185 todo!(
3186 "Reduce keyed with optional intermediates is not yet supported in simulator"
3187 );
3188 } else {
3189 let builder = graph_builders.get_dfir_mut(&out_location);
3190 builder.add_dfir(
3191 parse_quote! {
3192 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3193 },
3194 None,
3195 Some(&next_stmt_id.to_string()),
3196 );
3197 }
3198 }
3199 BuildersOrCallback::Callback(_, node_callback) => {
3200 node_callback(node, next_stmt_id);
3201 }
3202 }
3203
3204 *next_stmt_id += 1;
3205
3206 ident_stack.push(reduce_ident);
3207 }
3208
3209 HydroNode::ReduceKeyedWatermark {
3210 f,
3211 input,
3212 metadata,
3213 ..
3214 } => {
3215 let lifetime = if input.metadata().location_kind.is_top_level() {
3216 quote!('static)
3217 } else {
3218 quote!('tick)
3219 };
3220
3221 let watermark_ident = ident_stack.pop().unwrap();
3223 let input_ident = ident_stack.pop().unwrap();
3224
3225 let chain_ident = syn::Ident::new(
3226 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3227 Span::call_site(),
3228 );
3229
3230 let fold_ident =
3231 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3232
3233 let agg_operator: syn::Ident = if input.metadata().location_kind.is_top_level()
3234 && input.metadata().collection_kind.is_bounded()
3235 {
3236 parse_quote!(fold_no_replay)
3237 } else {
3238 parse_quote!(fold)
3239 };
3240
3241 match builders_or_callback {
3242 BuildersOrCallback::Builders(graph_builders) => {
3243 if metadata.location_kind.is_top_level()
3244 && !(matches!(metadata.location_kind, LocationId::Atomic(_)))
3245 && graph_builders.singleton_intermediates()
3246 && !metadata.collection_kind.is_bounded()
3247 {
3248 todo!(
3249 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3250 )
3251 } else {
3252 let builder = graph_builders.get_dfir_mut(&out_location);
3253 builder.add_dfir(
3254 parse_quote! {
3255 #chain_ident = chain();
3256 #input_ident
3257 -> map(|x| (Some(x), None))
3258 -> [0]#chain_ident;
3259 #watermark_ident
3260 -> map(|watermark| (None, Some(watermark)))
3261 -> [1]#chain_ident;
3262
3263 #fold_ident = #chain_ident
3264 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3265 let __reduce_keyed_fn = #f;
3266 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3267 if let Some((k, v)) = opt_payload {
3268 if let Some(curr_watermark) = *opt_curr_watermark {
3269 if k <= curr_watermark {
3270 return;
3271 }
3272 }
3273 match map.entry(k) {
3274 ::std::collections::hash_map::Entry::Vacant(e) => {
3275 e.insert(v);
3276 }
3277 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3278 __reduce_keyed_fn(e.get_mut(), v);
3279 }
3280 }
3281 } else {
3282 let watermark = opt_watermark.unwrap();
3283 if let Some(curr_watermark) = *opt_curr_watermark {
3284 if watermark <= curr_watermark {
3285 return;
3286 }
3287 }
3288 *opt_curr_watermark = opt_watermark;
3289 map.retain(|k, _| *k > watermark);
3290 }
3291 }
3292 })
3293 -> flat_map(|(map, _curr_watermark)| map);
3294 },
3295 None,
3296 Some(&next_stmt_id.to_string()),
3297 );
3298 }
3299 }
3300 BuildersOrCallback::Callback(_, node_callback) => {
3301 node_callback(node, next_stmt_id);
3302 }
3303 }
3304
3305 *next_stmt_id += 1;
3306
3307 ident_stack.push(fold_ident);
3308 }
3309
3310 HydroNode::Network {
3311 serialize_fn: serialize_pipeline,
3312 instantiate_fn,
3313 deserialize_fn: deserialize_pipeline,
3314 input,
3315 ..
3316 } => {
3317 let input_ident = ident_stack.pop().unwrap();
3318
3319 let receiver_stream_ident =
3320 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3321
3322 match builders_or_callback {
3323 BuildersOrCallback::Builders(graph_builders) => {
3324 let (sink_expr, source_expr) = match instantiate_fn {
3325 DebugInstantiate::Building => (
3326 syn::parse_quote!(DUMMY_SINK),
3327 syn::parse_quote!(DUMMY_SOURCE),
3328 ),
3329
3330 DebugInstantiate::Finalized(finalized) => {
3331 (finalized.sink.clone(), finalized.source.clone())
3332 }
3333 };
3334
3335 graph_builders.create_network(
3336 &input.metadata().location_kind,
3337 &out_location,
3338 input_ident,
3339 &receiver_stream_ident,
3340 serialize_pipeline,
3341 sink_expr,
3342 source_expr,
3343 deserialize_pipeline,
3344 *next_stmt_id,
3345 );
3346 }
3347 BuildersOrCallback::Callback(_, node_callback) => {
3348 node_callback(node, next_stmt_id);
3349 }
3350 }
3351
3352 *next_stmt_id += 1;
3353
3354 ident_stack.push(receiver_stream_ident);
3355 }
3356
3357 HydroNode::ExternalInput {
3358 instantiate_fn,
3359 deserialize_fn: deserialize_pipeline,
3360 ..
3361 } => {
3362 let receiver_stream_ident =
3363 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3364
3365 match builders_or_callback {
3366 BuildersOrCallback::Builders(graph_builders) => {
3367 let (_, source_expr) = match instantiate_fn {
3368 DebugInstantiate::Building => (
3369 syn::parse_quote!(DUMMY_SINK),
3370 syn::parse_quote!(DUMMY_SOURCE),
3371 ),
3372
3373 DebugInstantiate::Finalized(finalized) => {
3374 (finalized.sink.clone(), finalized.source.clone())
3375 }
3376 };
3377
3378 graph_builders.create_external_source(
3379 &out_location,
3380 source_expr,
3381 &receiver_stream_ident,
3382 deserialize_pipeline,
3383 *next_stmt_id,
3384 );
3385 }
3386 BuildersOrCallback::Callback(_, node_callback) => {
3387 node_callback(node, next_stmt_id);
3388 }
3389 }
3390
3391 *next_stmt_id += 1;
3392
3393 ident_stack.push(receiver_stream_ident);
3394 }
3395
3396 HydroNode::Counter {
3397 tag,
3398 duration,
3399 prefix,
3400 ..
3401 } => {
3402 let input_ident = ident_stack.pop().unwrap();
3403
3404 let counter_ident =
3405 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3406
3407 match builders_or_callback {
3408 BuildersOrCallback::Builders(graph_builders) => {
3409 let builder = graph_builders.get_dfir_mut(&out_location);
3410 builder.add_dfir(
3411 parse_quote! {
3412 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3413 },
3414 None,
3415 Some(&next_stmt_id.to_string()),
3416 );
3417 }
3418 BuildersOrCallback::Callback(_, node_callback) => {
3419 node_callback(node, next_stmt_id);
3420 }
3421 }
3422
3423 *next_stmt_id += 1;
3424
3425 ident_stack.push(counter_ident);
3426 }
3427 }
3428 },
3429 seen_tees,
3430 false,
3431 );
3432
3433 ident_stack
3434 .pop()
3435 .expect("ident_stack should have exactly one element after traversal")
3436 }
3437
3438 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3439 match self {
3440 HydroNode::Placeholder => {
3441 panic!()
3442 }
3443 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3444 HydroNode::Source { source, .. } => match source {
3445 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3446 HydroSource::ExternalNetwork()
3447 | HydroSource::Spin()
3448 | HydroSource::ClusterMembers(_) => {} },
3450 HydroNode::SingletonSource { value, .. } => {
3451 transform(value);
3452 }
3453 HydroNode::CycleSource { .. }
3454 | HydroNode::Tee { .. }
3455 | HydroNode::YieldConcat { .. }
3456 | HydroNode::BeginAtomic { .. }
3457 | HydroNode::EndAtomic { .. }
3458 | HydroNode::Batch { .. }
3459 | HydroNode::Chain { .. }
3460 | HydroNode::ChainFirst { .. }
3461 | HydroNode::CrossProduct { .. }
3462 | HydroNode::CrossSingleton { .. }
3463 | HydroNode::ResolveFutures { .. }
3464 | HydroNode::ResolveFuturesOrdered { .. }
3465 | HydroNode::Join { .. }
3466 | HydroNode::Difference { .. }
3467 | HydroNode::AntiJoin { .. }
3468 | HydroNode::DeferTick { .. }
3469 | HydroNode::Enumerate { .. }
3470 | HydroNode::Unique { .. }
3471 | HydroNode::Sort { .. } => {}
3472 HydroNode::Map { f, .. }
3473 | HydroNode::FlatMap { f, .. }
3474 | HydroNode::Filter { f, .. }
3475 | HydroNode::FilterMap { f, .. }
3476 | HydroNode::Inspect { f, .. }
3477 | HydroNode::Reduce { f, .. }
3478 | HydroNode::ReduceKeyed { f, .. }
3479 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3480 transform(f);
3481 }
3482 HydroNode::Fold { init, acc, .. }
3483 | HydroNode::Scan { init, acc, .. }
3484 | HydroNode::FoldKeyed { init, acc, .. } => {
3485 transform(init);
3486 transform(acc);
3487 }
3488 HydroNode::Network {
3489 serialize_fn,
3490 deserialize_fn,
3491 ..
3492 } => {
3493 if let Some(serialize_fn) = serialize_fn {
3494 transform(serialize_fn);
3495 }
3496 if let Some(deserialize_fn) = deserialize_fn {
3497 transform(deserialize_fn);
3498 }
3499 }
3500 HydroNode::ExternalInput { deserialize_fn, .. } => {
3501 if let Some(deserialize_fn) = deserialize_fn {
3502 transform(deserialize_fn);
3503 }
3504 }
3505 HydroNode::Counter { duration, .. } => {
3506 transform(duration);
3507 }
3508 }
3509 }
3510
3511 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3512 &self.metadata().op
3513 }
3514
3515 pub fn metadata(&self) -> &HydroIrMetadata {
3516 match self {
3517 HydroNode::Placeholder => {
3518 panic!()
3519 }
3520 HydroNode::Cast { metadata, .. } => metadata,
3521 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3522 HydroNode::Source { metadata, .. } => metadata,
3523 HydroNode::SingletonSource { metadata, .. } => metadata,
3524 HydroNode::CycleSource { metadata, .. } => metadata,
3525 HydroNode::Tee { metadata, .. } => metadata,
3526 HydroNode::YieldConcat { metadata, .. } => metadata,
3527 HydroNode::BeginAtomic { metadata, .. } => metadata,
3528 HydroNode::EndAtomic { metadata, .. } => metadata,
3529 HydroNode::Batch { metadata, .. } => metadata,
3530 HydroNode::Chain { metadata, .. } => metadata,
3531 HydroNode::ChainFirst { metadata, .. } => metadata,
3532 HydroNode::CrossProduct { metadata, .. } => metadata,
3533 HydroNode::CrossSingleton { metadata, .. } => metadata,
3534 HydroNode::Join { metadata, .. } => metadata,
3535 HydroNode::Difference { metadata, .. } => metadata,
3536 HydroNode::AntiJoin { metadata, .. } => metadata,
3537 HydroNode::ResolveFutures { metadata, .. } => metadata,
3538 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3539 HydroNode::Map { metadata, .. } => metadata,
3540 HydroNode::FlatMap { metadata, .. } => metadata,
3541 HydroNode::Filter { metadata, .. } => metadata,
3542 HydroNode::FilterMap { metadata, .. } => metadata,
3543 HydroNode::DeferTick { metadata, .. } => metadata,
3544 HydroNode::Enumerate { metadata, .. } => metadata,
3545 HydroNode::Inspect { metadata, .. } => metadata,
3546 HydroNode::Unique { metadata, .. } => metadata,
3547 HydroNode::Sort { metadata, .. } => metadata,
3548 HydroNode::Scan { metadata, .. } => metadata,
3549 HydroNode::Fold { metadata, .. } => metadata,
3550 HydroNode::FoldKeyed { metadata, .. } => metadata,
3551 HydroNode::Reduce { metadata, .. } => metadata,
3552 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3553 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3554 HydroNode::ExternalInput { metadata, .. } => metadata,
3555 HydroNode::Network { metadata, .. } => metadata,
3556 HydroNode::Counter { metadata, .. } => metadata,
3557 }
3558 }
3559
3560 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3561 &mut self.metadata_mut().op
3562 }
3563
3564 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3565 match self {
3566 HydroNode::Placeholder => {
3567 panic!()
3568 }
3569 HydroNode::Cast { metadata, .. } => metadata,
3570 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3571 HydroNode::Source { metadata, .. } => metadata,
3572 HydroNode::SingletonSource { metadata, .. } => metadata,
3573 HydroNode::CycleSource { metadata, .. } => metadata,
3574 HydroNode::Tee { metadata, .. } => metadata,
3575 HydroNode::YieldConcat { metadata, .. } => metadata,
3576 HydroNode::BeginAtomic { metadata, .. } => metadata,
3577 HydroNode::EndAtomic { metadata, .. } => metadata,
3578 HydroNode::Batch { metadata, .. } => metadata,
3579 HydroNode::Chain { metadata, .. } => metadata,
3580 HydroNode::ChainFirst { metadata, .. } => metadata,
3581 HydroNode::CrossProduct { metadata, .. } => metadata,
3582 HydroNode::CrossSingleton { metadata, .. } => metadata,
3583 HydroNode::Join { metadata, .. } => metadata,
3584 HydroNode::Difference { metadata, .. } => metadata,
3585 HydroNode::AntiJoin { metadata, .. } => metadata,
3586 HydroNode::ResolveFutures { metadata, .. } => metadata,
3587 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3588 HydroNode::Map { metadata, .. } => metadata,
3589 HydroNode::FlatMap { metadata, .. } => metadata,
3590 HydroNode::Filter { metadata, .. } => metadata,
3591 HydroNode::FilterMap { metadata, .. } => metadata,
3592 HydroNode::DeferTick { metadata, .. } => metadata,
3593 HydroNode::Enumerate { metadata, .. } => metadata,
3594 HydroNode::Inspect { metadata, .. } => metadata,
3595 HydroNode::Unique { metadata, .. } => metadata,
3596 HydroNode::Sort { metadata, .. } => metadata,
3597 HydroNode::Scan { metadata, .. } => metadata,
3598 HydroNode::Fold { metadata, .. } => metadata,
3599 HydroNode::FoldKeyed { metadata, .. } => metadata,
3600 HydroNode::Reduce { metadata, .. } => metadata,
3601 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3602 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3603 HydroNode::ExternalInput { metadata, .. } => metadata,
3604 HydroNode::Network { metadata, .. } => metadata,
3605 HydroNode::Counter { metadata, .. } => metadata,
3606 }
3607 }
3608
3609 pub fn input(&self) -> Vec<&HydroNode> {
3610 match self {
3611 HydroNode::Placeholder => {
3612 panic!()
3613 }
3614 HydroNode::Source { .. }
3615 | HydroNode::SingletonSource { .. }
3616 | HydroNode::ExternalInput { .. }
3617 | HydroNode::CycleSource { .. }
3618 | HydroNode::Tee { .. } => {
3619 vec![]
3621 }
3622 HydroNode::Cast { inner, .. }
3623 | HydroNode::ObserveNonDet { inner, .. }
3624 | HydroNode::YieldConcat { inner, .. }
3625 | HydroNode::BeginAtomic { inner, .. }
3626 | HydroNode::EndAtomic { inner, .. }
3627 | HydroNode::Batch { inner, .. } => {
3628 vec![inner]
3629 }
3630 HydroNode::Chain { first, second, .. } => {
3631 vec![first, second]
3632 }
3633 HydroNode::ChainFirst { first, second, .. } => {
3634 vec![first, second]
3635 }
3636 HydroNode::CrossProduct { left, right, .. }
3637 | HydroNode::CrossSingleton { left, right, .. }
3638 | HydroNode::Join { left, right, .. } => {
3639 vec![left, right]
3640 }
3641 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3642 vec![pos, neg]
3643 }
3644 HydroNode::Map { input, .. }
3645 | HydroNode::FlatMap { input, .. }
3646 | HydroNode::Filter { input, .. }
3647 | HydroNode::FilterMap { input, .. }
3648 | HydroNode::Sort { input, .. }
3649 | HydroNode::DeferTick { input, .. }
3650 | HydroNode::Enumerate { input, .. }
3651 | HydroNode::Inspect { input, .. }
3652 | HydroNode::Unique { input, .. }
3653 | HydroNode::Network { input, .. }
3654 | HydroNode::Counter { input, .. }
3655 | HydroNode::ResolveFutures { input, .. }
3656 | HydroNode::ResolveFuturesOrdered { input, .. }
3657 | HydroNode::Fold { input, .. }
3658 | HydroNode::FoldKeyed { input, .. }
3659 | HydroNode::Reduce { input, .. }
3660 | HydroNode::ReduceKeyed { input, .. }
3661 | HydroNode::Scan { input, .. } => {
3662 vec![input]
3663 }
3664 HydroNode::ReduceKeyedWatermark {
3665 input, watermark, ..
3666 } => {
3667 vec![input, watermark]
3668 }
3669 }
3670 }
3671
3672 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3673 self.input()
3674 .iter()
3675 .map(|input_node| input_node.metadata())
3676 .collect()
3677 }
3678
3679 pub fn print_root(&self) -> String {
3680 match self {
3681 HydroNode::Placeholder => {
3682 panic!()
3683 }
3684 HydroNode::Cast { .. } => "Cast()".to_string(),
3685 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_string(),
3686 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3687 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3688 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3689 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3690 HydroNode::YieldConcat { .. } => "YieldConcat()".to_string(),
3691 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_string(),
3692 HydroNode::EndAtomic { .. } => "EndAtomic()".to_string(),
3693 HydroNode::Batch { .. } => "Batch()".to_string(),
3694 HydroNode::Chain { first, second, .. } => {
3695 format!("Chain({}, {})", first.print_root(), second.print_root())
3696 }
3697 HydroNode::ChainFirst { first, second, .. } => {
3698 format!(
3699 "ChainFirst({}, {})",
3700 first.print_root(),
3701 second.print_root()
3702 )
3703 }
3704 HydroNode::CrossProduct { left, right, .. } => {
3705 format!(
3706 "CrossProduct({}, {})",
3707 left.print_root(),
3708 right.print_root()
3709 )
3710 }
3711 HydroNode::CrossSingleton { left, right, .. } => {
3712 format!(
3713 "CrossSingleton({}, {})",
3714 left.print_root(),
3715 right.print_root()
3716 )
3717 }
3718 HydroNode::Join { left, right, .. } => {
3719 format!("Join({}, {})", left.print_root(), right.print_root())
3720 }
3721 HydroNode::Difference { pos, neg, .. } => {
3722 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3723 }
3724 HydroNode::AntiJoin { pos, neg, .. } => {
3725 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3726 }
3727 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
3728 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
3729 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3730 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3731 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3732 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3733 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
3734 HydroNode::Enumerate { .. } => "Enumerate()".to_string(),
3735 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3736 HydroNode::Unique { .. } => "Unique()".to_string(),
3737 HydroNode::Sort { .. } => "Sort()".to_string(),
3738 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3739 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3740 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3741 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3742 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3743 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3744 HydroNode::Network { .. } => "Network()".to_string(),
3745 HydroNode::ExternalInput { .. } => "ExternalInput()".to_string(),
3746 HydroNode::Counter { tag, duration, .. } => {
3747 format!("Counter({:?}, {:?})", tag, duration)
3748 }
3749 }
3750 }
3751}
3752
3753#[cfg(feature = "build")]
3754fn instantiate_network<'a, D>(
3755 from_location: &LocationId,
3756 to_location: &LocationId,
3757 processes: &HashMap<usize, D::Process>,
3758 clusters: &HashMap<usize, D::Cluster>,
3759) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3760where
3761 D: Deploy<'a>,
3762{
3763 let ((sink, source), connect_fn) = match (from_location, to_location) {
3764 (LocationId::Process(from), LocationId::Process(to)) => {
3765 let from_node = processes
3766 .get(from)
3767 .unwrap_or_else(|| {
3768 panic!("A process used in the graph was not instantiated: {}", from)
3769 })
3770 .clone();
3771 let to_node = processes
3772 .get(to)
3773 .unwrap_or_else(|| {
3774 panic!("A process used in the graph was not instantiated: {}", to)
3775 })
3776 .clone();
3777
3778 let sink_port = from_node.next_port();
3779 let source_port = to_node.next_port();
3780
3781 (
3782 D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3783 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3784 )
3785 }
3786 (LocationId::Process(from), LocationId::Cluster(to)) => {
3787 let from_node = processes
3788 .get(from)
3789 .unwrap_or_else(|| {
3790 panic!("A process used in the graph was not instantiated: {}", from)
3791 })
3792 .clone();
3793 let to_node = clusters
3794 .get(to)
3795 .unwrap_or_else(|| {
3796 panic!("A cluster used in the graph was not instantiated: {}", to)
3797 })
3798 .clone();
3799
3800 let sink_port = from_node.next_port();
3801 let source_port = to_node.next_port();
3802
3803 (
3804 D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3805 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3806 )
3807 }
3808 (LocationId::Cluster(from), LocationId::Process(to)) => {
3809 let from_node = clusters
3810 .get(from)
3811 .unwrap_or_else(|| {
3812 panic!("A cluster used in the graph was not instantiated: {}", from)
3813 })
3814 .clone();
3815 let to_node = processes
3816 .get(to)
3817 .unwrap_or_else(|| {
3818 panic!("A process used in the graph was not instantiated: {}", to)
3819 })
3820 .clone();
3821
3822 let sink_port = from_node.next_port();
3823 let source_port = to_node.next_port();
3824
3825 (
3826 D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3827 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3828 )
3829 }
3830 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
3831 let from_node = clusters
3832 .get(from)
3833 .unwrap_or_else(|| {
3834 panic!("A cluster used in the graph was not instantiated: {}", from)
3835 })
3836 .clone();
3837 let to_node = clusters
3838 .get(to)
3839 .unwrap_or_else(|| {
3840 panic!("A cluster used in the graph was not instantiated: {}", to)
3841 })
3842 .clone();
3843
3844 let sink_port = from_node.next_port();
3845 let source_port = to_node.next_port();
3846
3847 (
3848 D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3849 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3850 )
3851 }
3852 (LocationId::Tick(_, _), _) => panic!(),
3853 (_, LocationId::Tick(_, _)) => panic!(),
3854 (LocationId::Atomic(_), _) => panic!(),
3855 (_, LocationId::Atomic(_)) => panic!(),
3856 };
3857 (sink, source, connect_fn)
3858}
3859
3860#[cfg(test)]
3861mod test {
3862 use std::mem::size_of;
3863
3864 use stageleft::{QuotedWithContext, q};
3865
3866 use super::*;
3867
3868 #[test]
3869 #[cfg_attr(
3870 not(feature = "build"),
3871 ignore = "expects inclusion of feature-gated fields"
3872 )]
3873 fn hydro_node_size() {
3874 assert_eq!(size_of::<HydroNode>(), 240);
3875 }
3876
3877 #[test]
3878 #[cfg_attr(
3879 not(feature = "build"),
3880 ignore = "expects inclusion of feature-gated fields"
3881 )]
3882 fn hydro_root_size() {
3883 assert_eq!(size_of::<HydroRoot>(), 136);
3884 }
3885
3886 #[test]
3887 fn test_simplify_q_macro_basic() {
3888 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3890 let result = simplify_q_macro(simple_expr.clone());
3891 assert_eq!(result, simple_expr);
3892 }
3893
3894 #[test]
3895 fn test_simplify_q_macro_actual_stageleft_call() {
3896 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3898 let result = simplify_q_macro(stageleft_call);
3899 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3902 }
3903
3904 #[test]
3905 fn test_closure_no_pipe_at_start() {
3906 let stageleft_call = q!({
3908 let foo = 123;
3909 move |b: usize| b + foo
3910 })
3911 .splice_fn1_ctx(&());
3912 let result = simplify_q_macro(stageleft_call);
3913 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3914 }
3915}