1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::{Debug, Display};
5use std::hash::{Hash, Hasher};
6use std::ops::Deref;
7use std::rc::Rc;
8
9#[cfg(feature = "build")]
10use dfir_lang::graph::FlatGraphBuilder;
11#[cfg(feature = "build")]
12use proc_macro2::Span;
13use proc_macro2::TokenStream;
14use quote::ToTokens;
15#[cfg(feature = "build")]
16use quote::quote;
17#[cfg(feature = "build")]
18use slotmap::{SecondaryMap, SparseSecondaryMap};
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::ExternalPortId;
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::dynamic::LocationId;
28use crate::location::{LocationKey, NetworkHint};
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40 fn from(expr: syn::Expr) -> Self {
41 Self(Box::new(expr))
42 }
43}
44
45impl Deref for DebugExpr {
46 type Target = syn::Expr;
47
48 fn deref(&self) -> &Self::Target {
49 &self.0
50 }
51}
52
53impl ToTokens for DebugExpr {
54 fn to_tokens(&self, tokens: &mut TokenStream) {
55 self.0.to_tokens(tokens);
56 }
57}
58
59impl Debug for DebugExpr {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{}", self.0.to_token_stream())
62 }
63}
64
65impl Display for DebugExpr {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 let original = self.0.as_ref().clone();
68 let simplified = simplify_q_macro(original);
69
70 write!(f, "q!({})", quote::quote!(#simplified))
73 }
74}
75
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78 let mut simplifier = QMacroSimplifier::new();
81 simplifier.visit_expr_mut(&mut expr);
82
83 if let Some(simplified) = simplifier.simplified_result {
85 simplified
86 } else {
87 expr
88 }
89}
90
91#[derive(Default)]
93pub struct QMacroSimplifier {
94 pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98 pub fn new() -> Self {
99 Self::default()
100 }
101}
102
103impl VisitMut for QMacroSimplifier {
104 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105 if self.simplified_result.is_some() {
107 return;
108 }
109
110 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111 && self.is_stageleft_runtime_support_call(&path_expr.path)
113 && let Some(closure) = self.extract_closure_from_args(&call.args)
115 {
116 self.simplified_result = Some(closure);
117 return;
118 }
119
120 syn::visit_mut::visit_expr_mut(self, expr);
123 }
124}
125
126impl QMacroSimplifier {
127 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128 if let Some(last_segment) = path.segments.last() {
130 let fn_name = last_segment.ident.to_string();
131 fn_name.contains("_type_hint")
133 && path.segments.len() > 2
134 && path.segments[0].ident == "stageleft"
135 && path.segments[1].ident == "runtime_support"
136 } else {
137 false
138 }
139 }
140
141 fn extract_closure_from_args(
142 &self,
143 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144 ) -> Option<syn::Expr> {
145 for arg in args {
147 if let syn::Expr::Closure(_) = arg {
148 return Some(arg.clone());
149 }
150 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152 return Some(closure_expr);
153 }
154 }
155 None
156 }
157
158 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159 let mut visitor = ClosureFinder {
160 found_closure: None,
161 prefer_inner_blocks: true,
162 };
163 visitor.visit_expr(expr);
164 visitor.found_closure
165 }
166}
167
168struct ClosureFinder {
170 found_closure: Option<syn::Expr>,
171 prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176 if self.found_closure.is_some() {
178 return;
179 }
180
181 match expr {
182 syn::Expr::Closure(_) => {
183 self.found_closure = Some(expr.clone());
184 }
185 syn::Expr::Block(block) if self.prefer_inner_blocks => {
186 for stmt in &block.block.stmts {
188 if let syn::Stmt::Expr(stmt_expr, _) = stmt
189 && let syn::Expr::Block(_) = stmt_expr
190 {
191 let mut inner_visitor = ClosureFinder {
193 found_closure: None,
194 prefer_inner_blocks: false, };
196 inner_visitor.visit_expr(stmt_expr);
197 if inner_visitor.found_closure.is_some() {
198 self.found_closure = Some(stmt_expr.clone());
200 return;
201 }
202 }
203 }
204
205 visit::visit_expr(self, expr);
207
208 if self.found_closure.is_some() {
211 }
213 }
214 _ => {
215 visit::visit_expr(self, expr);
217 }
218 }
219 }
220}
221
222#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229 fn from(t: syn::Type) -> Self {
230 Self(Box::new(t))
231 }
232}
233
234impl Deref for DebugType {
235 type Target = syn::Type;
236
237 fn deref(&self) -> &Self::Target {
238 &self.0
239 }
240}
241
242impl ToTokens for DebugType {
243 fn to_tokens(&self, tokens: &mut TokenStream) {
244 self.0.to_tokens(tokens);
245 }
246}
247
248impl Debug for DebugType {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 write!(f, "{}", self.0.to_token_stream())
251 }
252}
253
254pub enum DebugInstantiate {
255 Building,
256 Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260 not(feature = "build"),
261 expect(
262 dead_code,
263 reason = "sink, source unused without `feature = \"build\"`."
264 )
265)]
266pub struct DebugInstantiateFinalized {
267 sink: syn::Expr,
268 source: syn::Expr,
269 connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273 fn from(f: DebugInstantiateFinalized) -> Self {
274 Self::Finalized(Box::new(f))
275 }
276}
277
278impl Debug for DebugInstantiate {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 write!(f, "<network instantiate>")
281 }
282}
283
284impl Hash for DebugInstantiate {
285 fn hash<H: Hasher>(&self, _state: &mut H) {
286 }
288}
289
290impl Clone for DebugInstantiate {
291 fn clone(&self) -> Self {
292 match self {
293 DebugInstantiate::Building => DebugInstantiate::Building,
294 DebugInstantiate::Finalized(_) => {
295 panic!("DebugInstantiate::Finalized should not be cloned")
296 }
297 }
298 }
299}
300
301#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304 Stream(DebugExpr),
305 ExternalNetwork(),
306 Iter(DebugExpr),
307 Spin(),
308 ClusterMembers(LocationId),
309 Embedded(syn::Ident),
310}
311
312#[cfg(feature = "build")]
313pub trait DfirBuilder {
319 fn singleton_intermediates(&self) -> bool;
321
322 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
324
325 fn batch(
326 &mut self,
327 in_ident: syn::Ident,
328 in_location: &LocationId,
329 in_kind: &CollectionKind,
330 out_ident: &syn::Ident,
331 out_location: &LocationId,
332 op_meta: &HydroIrOpMetadata,
333 );
334 fn yield_from_tick(
335 &mut self,
336 in_ident: syn::Ident,
337 in_location: &LocationId,
338 in_kind: &CollectionKind,
339 out_ident: &syn::Ident,
340 out_location: &LocationId,
341 );
342
343 fn begin_atomic(
344 &mut self,
345 in_ident: syn::Ident,
346 in_location: &LocationId,
347 in_kind: &CollectionKind,
348 out_ident: &syn::Ident,
349 out_location: &LocationId,
350 op_meta: &HydroIrOpMetadata,
351 );
352 fn end_atomic(
353 &mut self,
354 in_ident: syn::Ident,
355 in_location: &LocationId,
356 in_kind: &CollectionKind,
357 out_ident: &syn::Ident,
358 );
359
360 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
361 fn observe_nondet(
362 &mut self,
363 trusted: bool,
364 location: &LocationId,
365 in_ident: syn::Ident,
366 in_kind: &CollectionKind,
367 out_ident: &syn::Ident,
368 out_kind: &CollectionKind,
369 op_meta: &HydroIrOpMetadata,
370 );
371
372 #[expect(clippy::too_many_arguments, reason = "TODO")]
373 fn create_network(
374 &mut self,
375 from: &LocationId,
376 to: &LocationId,
377 input_ident: syn::Ident,
378 out_ident: &syn::Ident,
379 serialize: Option<&DebugExpr>,
380 sink: syn::Expr,
381 source: syn::Expr,
382 deserialize: Option<&DebugExpr>,
383 tag_id: usize,
384 );
385
386 fn create_external_source(
387 &mut self,
388 on: &LocationId,
389 source_expr: syn::Expr,
390 out_ident: &syn::Ident,
391 deserialize: Option<&DebugExpr>,
392 tag_id: usize,
393 );
394
395 fn create_external_output(
396 &mut self,
397 on: &LocationId,
398 sink_expr: syn::Expr,
399 input_ident: &syn::Ident,
400 serialize: Option<&DebugExpr>,
401 tag_id: usize,
402 );
403}
404
405#[cfg(feature = "build")]
406impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
407 fn singleton_intermediates(&self) -> bool {
408 false
409 }
410
411 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
412 self.entry(location.root().key())
413 .expect("location was removed")
414 .or_default()
415 }
416
417 fn batch(
418 &mut self,
419 in_ident: syn::Ident,
420 in_location: &LocationId,
421 in_kind: &CollectionKind,
422 out_ident: &syn::Ident,
423 _out_location: &LocationId,
424 _op_meta: &HydroIrOpMetadata,
425 ) {
426 let builder = self.get_dfir_mut(in_location.root());
427 if in_kind.is_bounded()
428 && matches!(
429 in_kind,
430 CollectionKind::Singleton { .. }
431 | CollectionKind::Optional { .. }
432 | CollectionKind::KeyedSingleton { .. }
433 )
434 {
435 assert!(in_location.is_top_level());
436 builder.add_dfir(
437 parse_quote! {
438 #out_ident = #in_ident -> persist::<'static>();
439 },
440 None,
441 None,
442 );
443 } else {
444 builder.add_dfir(
445 parse_quote! {
446 #out_ident = #in_ident;
447 },
448 None,
449 None,
450 );
451 }
452 }
453
454 fn yield_from_tick(
455 &mut self,
456 in_ident: syn::Ident,
457 in_location: &LocationId,
458 _in_kind: &CollectionKind,
459 out_ident: &syn::Ident,
460 _out_location: &LocationId,
461 ) {
462 let builder = self.get_dfir_mut(in_location.root());
463 builder.add_dfir(
464 parse_quote! {
465 #out_ident = #in_ident;
466 },
467 None,
468 None,
469 );
470 }
471
472 fn begin_atomic(
473 &mut self,
474 in_ident: syn::Ident,
475 in_location: &LocationId,
476 _in_kind: &CollectionKind,
477 out_ident: &syn::Ident,
478 _out_location: &LocationId,
479 _op_meta: &HydroIrOpMetadata,
480 ) {
481 let builder = self.get_dfir_mut(in_location.root());
482 builder.add_dfir(
483 parse_quote! {
484 #out_ident = #in_ident;
485 },
486 None,
487 None,
488 );
489 }
490
491 fn end_atomic(
492 &mut self,
493 in_ident: syn::Ident,
494 in_location: &LocationId,
495 _in_kind: &CollectionKind,
496 out_ident: &syn::Ident,
497 ) {
498 let builder = self.get_dfir_mut(in_location.root());
499 builder.add_dfir(
500 parse_quote! {
501 #out_ident = #in_ident;
502 },
503 None,
504 None,
505 );
506 }
507
508 fn observe_nondet(
509 &mut self,
510 _trusted: bool,
511 location: &LocationId,
512 in_ident: syn::Ident,
513 _in_kind: &CollectionKind,
514 out_ident: &syn::Ident,
515 _out_kind: &CollectionKind,
516 _op_meta: &HydroIrOpMetadata,
517 ) {
518 let builder = self.get_dfir_mut(location);
519 builder.add_dfir(
520 parse_quote! {
521 #out_ident = #in_ident;
522 },
523 None,
524 None,
525 );
526 }
527
528 fn create_network(
529 &mut self,
530 from: &LocationId,
531 to: &LocationId,
532 input_ident: syn::Ident,
533 out_ident: &syn::Ident,
534 serialize: Option<&DebugExpr>,
535 sink: syn::Expr,
536 source: syn::Expr,
537 deserialize: Option<&DebugExpr>,
538 tag_id: usize,
539 ) {
540 let sender_builder = self.get_dfir_mut(from);
541 if let Some(serialize_pipeline) = serialize {
542 sender_builder.add_dfir(
543 parse_quote! {
544 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
545 },
546 None,
547 Some(&format!("send{}", tag_id)),
549 );
550 } else {
551 sender_builder.add_dfir(
552 parse_quote! {
553 #input_ident -> dest_sink(#sink);
554 },
555 None,
556 Some(&format!("send{}", tag_id)),
557 );
558 }
559
560 let receiver_builder = self.get_dfir_mut(to);
561 if let Some(deserialize_pipeline) = deserialize {
562 receiver_builder.add_dfir(
563 parse_quote! {
564 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
565 },
566 None,
567 Some(&format!("recv{}", tag_id)),
568 );
569 } else {
570 receiver_builder.add_dfir(
571 parse_quote! {
572 #out_ident = source_stream(#source);
573 },
574 None,
575 Some(&format!("recv{}", tag_id)),
576 );
577 }
578 }
579
580 fn create_external_source(
581 &mut self,
582 on: &LocationId,
583 source_expr: syn::Expr,
584 out_ident: &syn::Ident,
585 deserialize: Option<&DebugExpr>,
586 tag_id: usize,
587 ) {
588 let receiver_builder = self.get_dfir_mut(on);
589 if let Some(deserialize_pipeline) = deserialize {
590 receiver_builder.add_dfir(
591 parse_quote! {
592 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
593 },
594 None,
595 Some(&format!("recv{}", tag_id)),
596 );
597 } else {
598 receiver_builder.add_dfir(
599 parse_quote! {
600 #out_ident = source_stream(#source_expr);
601 },
602 None,
603 Some(&format!("recv{}", tag_id)),
604 );
605 }
606 }
607
608 fn create_external_output(
609 &mut self,
610 on: &LocationId,
611 sink_expr: syn::Expr,
612 input_ident: &syn::Ident,
613 serialize: Option<&DebugExpr>,
614 tag_id: usize,
615 ) {
616 let sender_builder = self.get_dfir_mut(on);
617 if let Some(serialize_fn) = serialize {
618 sender_builder.add_dfir(
619 parse_quote! {
620 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
621 },
622 None,
623 Some(&format!("send{}", tag_id)),
625 );
626 } else {
627 sender_builder.add_dfir(
628 parse_quote! {
629 #input_ident -> dest_sink(#sink_expr);
630 },
631 None,
632 Some(&format!("send{}", tag_id)),
633 );
634 }
635 }
636}
637
638#[cfg(feature = "build")]
639pub enum BuildersOrCallback<'a, L, N>
640where
641 L: FnMut(&mut HydroRoot, &mut usize),
642 N: FnMut(&mut HydroNode, &mut usize),
643{
644 Builders(&'a mut dyn DfirBuilder),
645 Callback(L, N),
646}
647
648#[derive(Debug, Hash)]
652pub enum HydroRoot {
653 ForEach {
654 f: DebugExpr,
655 input: Box<HydroNode>,
656 op_metadata: HydroIrOpMetadata,
657 },
658 SendExternal {
659 to_external_key: LocationKey,
660 to_port_id: ExternalPortId,
661 to_many: bool,
662 unpaired: bool,
663 serialize_fn: Option<DebugExpr>,
664 instantiate_fn: DebugInstantiate,
665 input: Box<HydroNode>,
666 op_metadata: HydroIrOpMetadata,
667 },
668 DestSink {
669 sink: DebugExpr,
670 input: Box<HydroNode>,
671 op_metadata: HydroIrOpMetadata,
672 },
673 CycleSink {
674 ident: syn::Ident,
675 input: Box<HydroNode>,
676 op_metadata: HydroIrOpMetadata,
677 },
678 EmbeddedOutput {
679 ident: syn::Ident,
680 input: Box<HydroNode>,
681 op_metadata: HydroIrOpMetadata,
682 },
683}
684
685impl HydroRoot {
686 #[cfg(feature = "build")]
687 pub fn compile_network<'a, D>(
688 &mut self,
689 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
690 seen_tees: &mut SeenTees,
691 processes: &SparseSecondaryMap<LocationKey, D::Process>,
692 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
693 externals: &SparseSecondaryMap<LocationKey, D::External>,
694 env: &mut D::InstantiateEnv,
695 ) where
696 D: Deploy<'a>,
697 {
698 let refcell_extra_stmts = RefCell::new(extra_stmts);
699 let refcell_env = RefCell::new(env);
700 self.transform_bottom_up(
701 &mut |l| {
702 if let HydroRoot::SendExternal {
703 input,
704 to_external_key,
705 to_port_id,
706 to_many,
707 unpaired,
708 instantiate_fn,
709 ..
710 } = l
711 {
712 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
713 DebugInstantiate::Building => {
714 let to_node = externals
715 .get(*to_external_key)
716 .unwrap_or_else(|| {
717 panic!("A external used in the graph was not instantiated: {}", to_external_key)
718 })
719 .clone();
720
721 match input.metadata().location_id.root() {
722 &LocationId::Process(process_key) => {
723 if *to_many {
724 (
725 (
726 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
727 parse_quote!(DUMMY),
728 ),
729 Box::new(|| {}) as Box<dyn FnOnce()>,
730 )
731 } else {
732 let from_node = processes
733 .get(process_key)
734 .unwrap_or_else(|| {
735 panic!("A process used in the graph was not instantiated: {}", process_key)
736 })
737 .clone();
738
739 let sink_port = from_node.next_port();
740 let source_port = to_node.next_port();
741
742 if *unpaired {
743 use stageleft::quote_type;
744 use tokio_util::codec::LengthDelimitedCodec;
745
746 to_node.register(*to_port_id, source_port.clone());
747
748 let _ = D::e2o_source(
749 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
750 &to_node, &source_port,
751 &from_node, &sink_port,
752 "e_type::<LengthDelimitedCodec>(),
753 format!("{}_{}", *to_external_key, *to_port_id)
754 );
755 }
756
757 (
758 (
759 D::o2e_sink(
760 &from_node,
761 &sink_port,
762 &to_node,
763 &source_port,
764 format!("{}_{}", *to_external_key, *to_port_id)
765 ),
766 parse_quote!(DUMMY),
767 ),
768 if *unpaired {
769 D::e2o_connect(
770 &to_node,
771 &source_port,
772 &from_node,
773 &sink_port,
774 *to_many,
775 NetworkHint::Auto,
776 )
777 } else {
778 Box::new(|| {}) as Box<dyn FnOnce()>
779 },
780 )
781 }
782 }
783 LocationId::Cluster(_) => todo!(),
784 _ => panic!()
785 }
786 },
787
788 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
789 };
790
791 *instantiate_fn = DebugInstantiateFinalized {
792 sink: sink_expr,
793 source: source_expr,
794 connect_fn: Some(connect_fn),
795 }
796 .into();
797 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
798 let element_type = match &input.metadata().collection_kind {
799 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
800 _ => panic!("Embedded output must have Stream collection kind"),
801 };
802 let location_key = match input.metadata().location_id.root() {
803 LocationId::Process(key) | LocationId::Cluster(key) => *key,
804 _ => panic!("Embedded output must be on a process or cluster"),
805 };
806 D::register_embedded_output(
807 &mut refcell_env.borrow_mut(),
808 location_key,
809 ident,
810 &element_type,
811 );
812 }
813 },
814 &mut |n| {
815 if let HydroNode::Network {
816 input,
817 instantiate_fn,
818 metadata,
819 ..
820 } = n
821 {
822 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
823 DebugInstantiate::Building => instantiate_network::<D>(
824 input.metadata().location_id.root(),
825 metadata.location_id.root(),
826 processes,
827 clusters,
828 ),
829
830 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
831 };
832
833 *instantiate_fn = DebugInstantiateFinalized {
834 sink: sink_expr,
835 source: source_expr,
836 connect_fn: Some(connect_fn),
837 }
838 .into();
839 } else if let HydroNode::ExternalInput {
840 from_external_key,
841 from_port_id,
842 from_many,
843 codec_type,
844 port_hint,
845 instantiate_fn,
846 metadata,
847 ..
848 } = n
849 {
850 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
851 DebugInstantiate::Building => {
852 let from_node = externals
853 .get(*from_external_key)
854 .unwrap_or_else(|| {
855 panic!(
856 "A external used in the graph was not instantiated: {}",
857 from_external_key,
858 )
859 })
860 .clone();
861
862 match metadata.location_id.root() {
863 &LocationId::Process(process_key) => {
864 let to_node = processes
865 .get(process_key)
866 .unwrap_or_else(|| {
867 panic!("A process used in the graph was not instantiated: {}", process_key)
868 })
869 .clone();
870
871 let sink_port = from_node.next_port();
872 let source_port = to_node.next_port();
873
874 from_node.register(*from_port_id, sink_port.clone());
875
876 (
877 (
878 parse_quote!(DUMMY),
879 if *from_many {
880 D::e2o_many_source(
881 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
882 &to_node, &source_port,
883 codec_type.0.as_ref(),
884 format!("{}_{}", *from_external_key, *from_port_id)
885 )
886 } else {
887 D::e2o_source(
888 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
889 &from_node, &sink_port,
890 &to_node, &source_port,
891 codec_type.0.as_ref(),
892 format!("{}_{}", *from_external_key, *from_port_id)
893 )
894 },
895 ),
896 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
897 )
898 }
899 LocationId::Cluster(_) => todo!(),
900 _ => panic!()
901 }
902 },
903
904 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
905 };
906
907 *instantiate_fn = DebugInstantiateFinalized {
908 sink: sink_expr,
909 source: source_expr,
910 connect_fn: Some(connect_fn),
911 }
912 .into();
913 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
914 let element_type = match &metadata.collection_kind {
915 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
916 _ => panic!("Embedded source must have Stream collection kind"),
917 };
918 let location_key = match metadata.location_id.root() {
919 LocationId::Process(key) | LocationId::Cluster(key) => *key,
920 _ => panic!("Embedded source must be on a process or cluster"),
921 };
922 D::register_embedded_input(
923 &mut refcell_env.borrow_mut(),
924 location_key,
925 ident,
926 &element_type,
927 );
928 }
929 },
930 seen_tees,
931 false,
932 );
933 }
934
935 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
936 self.transform_bottom_up(
937 &mut |l| {
938 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
939 match instantiate_fn {
940 DebugInstantiate::Building => panic!("network not built"),
941
942 DebugInstantiate::Finalized(finalized) => {
943 (finalized.connect_fn.take().unwrap())();
944 }
945 }
946 }
947 },
948 &mut |n| {
949 if let HydroNode::Network { instantiate_fn, .. }
950 | HydroNode::ExternalInput { instantiate_fn, .. } = n
951 {
952 match instantiate_fn {
953 DebugInstantiate::Building => panic!("network not built"),
954
955 DebugInstantiate::Finalized(finalized) => {
956 (finalized.connect_fn.take().unwrap())();
957 }
958 }
959 }
960 },
961 seen_tees,
962 false,
963 );
964 }
965
966 pub fn transform_bottom_up(
967 &mut self,
968 transform_root: &mut impl FnMut(&mut HydroRoot),
969 transform_node: &mut impl FnMut(&mut HydroNode),
970 seen_tees: &mut SeenTees,
971 check_well_formed: bool,
972 ) {
973 self.transform_children(
974 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
975 seen_tees,
976 );
977
978 transform_root(self);
979 }
980
981 pub fn transform_children(
982 &mut self,
983 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
984 seen_tees: &mut SeenTees,
985 ) {
986 match self {
987 HydroRoot::ForEach { input, .. }
988 | HydroRoot::SendExternal { input, .. }
989 | HydroRoot::DestSink { input, .. }
990 | HydroRoot::CycleSink { input, .. }
991 | HydroRoot::EmbeddedOutput { input, .. } => {
992 transform(input, seen_tees);
993 }
994 }
995 }
996
997 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
998 match self {
999 HydroRoot::ForEach {
1000 f,
1001 input,
1002 op_metadata,
1003 } => HydroRoot::ForEach {
1004 f: f.clone(),
1005 input: Box::new(input.deep_clone(seen_tees)),
1006 op_metadata: op_metadata.clone(),
1007 },
1008 HydroRoot::SendExternal {
1009 to_external_key,
1010 to_port_id,
1011 to_many,
1012 unpaired,
1013 serialize_fn,
1014 instantiate_fn,
1015 input,
1016 op_metadata,
1017 } => HydroRoot::SendExternal {
1018 to_external_key: *to_external_key,
1019 to_port_id: *to_port_id,
1020 to_many: *to_many,
1021 unpaired: *unpaired,
1022 serialize_fn: serialize_fn.clone(),
1023 instantiate_fn: instantiate_fn.clone(),
1024 input: Box::new(input.deep_clone(seen_tees)),
1025 op_metadata: op_metadata.clone(),
1026 },
1027 HydroRoot::DestSink {
1028 sink,
1029 input,
1030 op_metadata,
1031 } => HydroRoot::DestSink {
1032 sink: sink.clone(),
1033 input: Box::new(input.deep_clone(seen_tees)),
1034 op_metadata: op_metadata.clone(),
1035 },
1036 HydroRoot::CycleSink {
1037 ident,
1038 input,
1039 op_metadata,
1040 } => HydroRoot::CycleSink {
1041 ident: ident.clone(),
1042 input: Box::new(input.deep_clone(seen_tees)),
1043 op_metadata: op_metadata.clone(),
1044 },
1045 HydroRoot::EmbeddedOutput {
1046 ident,
1047 input,
1048 op_metadata,
1049 } => HydroRoot::EmbeddedOutput {
1050 ident: ident.clone(),
1051 input: Box::new(input.deep_clone(seen_tees)),
1052 op_metadata: op_metadata.clone(),
1053 },
1054 }
1055 }
1056
1057 #[cfg(feature = "build")]
1058 pub fn emit<'a, D: Deploy<'a>>(
1059 &mut self,
1060 graph_builders: &mut dyn DfirBuilder,
1061 seen_tees: &mut SeenTees,
1062 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1063 next_stmt_id: &mut usize,
1064 ) {
1065 self.emit_core::<D>(
1066 &mut BuildersOrCallback::<
1067 fn(&mut HydroRoot, &mut usize),
1068 fn(&mut HydroNode, &mut usize),
1069 >::Builders(graph_builders),
1070 seen_tees,
1071 built_tees,
1072 next_stmt_id,
1073 );
1074 }
1075
1076 #[cfg(feature = "build")]
1077 pub fn emit_core<'a, D: Deploy<'a>>(
1078 &mut self,
1079 builders_or_callback: &mut BuildersOrCallback<
1080 impl FnMut(&mut HydroRoot, &mut usize),
1081 impl FnMut(&mut HydroNode, &mut usize),
1082 >,
1083 seen_tees: &mut SeenTees,
1084 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1085 next_stmt_id: &mut usize,
1086 ) {
1087 match self {
1088 HydroRoot::ForEach { f, input, .. } => {
1089 let input_ident =
1090 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1091
1092 match builders_or_callback {
1093 BuildersOrCallback::Builders(graph_builders) => {
1094 graph_builders
1095 .get_dfir_mut(&input.metadata().location_id)
1096 .add_dfir(
1097 parse_quote! {
1098 #input_ident -> for_each(#f);
1099 },
1100 None,
1101 Some(&next_stmt_id.to_string()),
1102 );
1103 }
1104 BuildersOrCallback::Callback(leaf_callback, _) => {
1105 leaf_callback(self, next_stmt_id);
1106 }
1107 }
1108
1109 *next_stmt_id += 1;
1110 }
1111
1112 HydroRoot::SendExternal {
1113 serialize_fn,
1114 instantiate_fn,
1115 input,
1116 ..
1117 } => {
1118 let input_ident =
1119 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1120
1121 match builders_or_callback {
1122 BuildersOrCallback::Builders(graph_builders) => {
1123 let (sink_expr, _) = match instantiate_fn {
1124 DebugInstantiate::Building => (
1125 syn::parse_quote!(DUMMY_SINK),
1126 syn::parse_quote!(DUMMY_SOURCE),
1127 ),
1128
1129 DebugInstantiate::Finalized(finalized) => {
1130 (finalized.sink.clone(), finalized.source.clone())
1131 }
1132 };
1133
1134 graph_builders.create_external_output(
1135 &input.metadata().location_id,
1136 sink_expr,
1137 &input_ident,
1138 serialize_fn.as_ref(),
1139 *next_stmt_id,
1140 );
1141 }
1142 BuildersOrCallback::Callback(leaf_callback, _) => {
1143 leaf_callback(self, next_stmt_id);
1144 }
1145 }
1146
1147 *next_stmt_id += 1;
1148 }
1149
1150 HydroRoot::DestSink { sink, input, .. } => {
1151 let input_ident =
1152 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1153
1154 match builders_or_callback {
1155 BuildersOrCallback::Builders(graph_builders) => {
1156 graph_builders
1157 .get_dfir_mut(&input.metadata().location_id)
1158 .add_dfir(
1159 parse_quote! {
1160 #input_ident -> dest_sink(#sink);
1161 },
1162 None,
1163 Some(&next_stmt_id.to_string()),
1164 );
1165 }
1166 BuildersOrCallback::Callback(leaf_callback, _) => {
1167 leaf_callback(self, next_stmt_id);
1168 }
1169 }
1170
1171 *next_stmt_id += 1;
1172 }
1173
1174 HydroRoot::CycleSink { ident, input, .. } => {
1175 let input_ident =
1176 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1177
1178 match builders_or_callback {
1179 BuildersOrCallback::Builders(graph_builders) => {
1180 let elem_type: syn::Type = match &input.metadata().collection_kind {
1181 CollectionKind::KeyedSingleton {
1182 key_type,
1183 value_type,
1184 ..
1185 }
1186 | CollectionKind::KeyedStream {
1187 key_type,
1188 value_type,
1189 ..
1190 } => {
1191 parse_quote!((#key_type, #value_type))
1192 }
1193 CollectionKind::Stream { element_type, .. }
1194 | CollectionKind::Singleton { element_type, .. }
1195 | CollectionKind::Optional { element_type, .. } => {
1196 parse_quote!(#element_type)
1197 }
1198 };
1199
1200 graph_builders
1201 .get_dfir_mut(&input.metadata().location_id)
1202 .add_dfir(
1203 parse_quote! {
1204 #ident = #input_ident -> identity::<#elem_type>();
1205 },
1206 None,
1207 None,
1208 );
1209 }
1210 BuildersOrCallback::Callback(_, _) => {}
1212 }
1213 }
1214
1215 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1216 let input_ident =
1217 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1218
1219 match builders_or_callback {
1220 BuildersOrCallback::Builders(graph_builders) => {
1221 graph_builders
1222 .get_dfir_mut(&input.metadata().location_id)
1223 .add_dfir(
1224 parse_quote! {
1225 #input_ident -> for_each(&mut #ident);
1226 },
1227 None,
1228 Some(&next_stmt_id.to_string()),
1229 );
1230 }
1231 BuildersOrCallback::Callback(leaf_callback, _) => {
1232 leaf_callback(self, next_stmt_id);
1233 }
1234 }
1235
1236 *next_stmt_id += 1;
1237 }
1238 }
1239 }
1240
1241 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1242 match self {
1243 HydroRoot::ForEach { op_metadata, .. }
1244 | HydroRoot::SendExternal { op_metadata, .. }
1245 | HydroRoot::DestSink { op_metadata, .. }
1246 | HydroRoot::CycleSink { op_metadata, .. }
1247 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1248 }
1249 }
1250
1251 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1252 match self {
1253 HydroRoot::ForEach { op_metadata, .. }
1254 | HydroRoot::SendExternal { op_metadata, .. }
1255 | HydroRoot::DestSink { op_metadata, .. }
1256 | HydroRoot::CycleSink { op_metadata, .. }
1257 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1258 }
1259 }
1260
1261 pub fn input(&self) -> &HydroNode {
1262 match self {
1263 HydroRoot::ForEach { input, .. }
1264 | HydroRoot::SendExternal { input, .. }
1265 | HydroRoot::DestSink { input, .. }
1266 | HydroRoot::CycleSink { input, .. }
1267 | HydroRoot::EmbeddedOutput { input, .. } => input,
1268 }
1269 }
1270
1271 pub fn input_metadata(&self) -> &HydroIrMetadata {
1272 self.input().metadata()
1273 }
1274
1275 pub fn print_root(&self) -> String {
1276 match self {
1277 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1278 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1279 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1280 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1281 HydroRoot::EmbeddedOutput { ident, .. } => {
1282 format!("EmbeddedOutput({})", ident)
1283 }
1284 }
1285 }
1286
1287 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1288 match self {
1289 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1290 transform(f);
1291 }
1292 HydroRoot::SendExternal { .. }
1293 | HydroRoot::CycleSink { .. }
1294 | HydroRoot::EmbeddedOutput { .. } => {}
1295 }
1296 }
1297}
1298
1299#[cfg(feature = "build")]
1300pub fn emit<'a, D: Deploy<'a>>(
1301 ir: &mut Vec<HydroRoot>,
1302) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1303 let mut builders = SecondaryMap::new();
1304 let mut seen_tees = HashMap::new();
1305 let mut built_tees = HashMap::new();
1306 let mut next_stmt_id = 0;
1307 for leaf in ir {
1308 leaf.emit::<D>(
1309 &mut builders,
1310 &mut seen_tees,
1311 &mut built_tees,
1312 &mut next_stmt_id,
1313 );
1314 }
1315 builders
1316}
1317
1318#[cfg(feature = "build")]
1319pub fn traverse_dfir<'a, D: Deploy<'a>>(
1320 ir: &mut [HydroRoot],
1321 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1322 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1323) {
1324 let mut seen_tees = HashMap::new();
1325 let mut built_tees = HashMap::new();
1326 let mut next_stmt_id = 0;
1327 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1328 ir.iter_mut().for_each(|leaf| {
1329 leaf.emit_core::<D>(
1330 &mut callback,
1331 &mut seen_tees,
1332 &mut built_tees,
1333 &mut next_stmt_id,
1334 );
1335 });
1336}
1337
1338pub fn transform_bottom_up(
1339 ir: &mut [HydroRoot],
1340 transform_root: &mut impl FnMut(&mut HydroRoot),
1341 transform_node: &mut impl FnMut(&mut HydroNode),
1342 check_well_formed: bool,
1343) {
1344 let mut seen_tees = HashMap::new();
1345 ir.iter_mut().for_each(|leaf| {
1346 leaf.transform_bottom_up(
1347 transform_root,
1348 transform_node,
1349 &mut seen_tees,
1350 check_well_formed,
1351 );
1352 });
1353}
1354
1355pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1356 let mut seen_tees = HashMap::new();
1357 ir.iter()
1358 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1359 .collect()
1360}
1361
1362type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1363thread_local! {
1364 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1365}
1366
1367pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1368 PRINTED_TEES.with(|printed_tees| {
1369 let mut printed_tees_mut = printed_tees.borrow_mut();
1370 *printed_tees_mut = Some((0, HashMap::new()));
1371 drop(printed_tees_mut);
1372
1373 let ret = f();
1374
1375 let mut printed_tees_mut = printed_tees.borrow_mut();
1376 *printed_tees_mut = None;
1377
1378 ret
1379 })
1380}
1381
1382pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1383
1384impl TeeNode {
1385 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1386 Rc::as_ptr(&self.0)
1387 }
1388}
1389
1390impl Debug for TeeNode {
1391 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1392 PRINTED_TEES.with(|printed_tees| {
1393 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1394 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1395
1396 if let Some(printed_tees_mut) = printed_tees_mut {
1397 if let Some(existing) = printed_tees_mut
1398 .1
1399 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1400 {
1401 write!(f, "<tee {}>", existing)
1402 } else {
1403 let next_id = printed_tees_mut.0;
1404 printed_tees_mut.0 += 1;
1405 printed_tees_mut
1406 .1
1407 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1408 drop(printed_tees_mut_borrow);
1409 write!(f, "<tee {}>: ", next_id)?;
1410 Debug::fmt(&self.0.borrow(), f)
1411 }
1412 } else {
1413 drop(printed_tees_mut_borrow);
1414 write!(f, "<tee>: ")?;
1415 Debug::fmt(&self.0.borrow(), f)
1416 }
1417 })
1418 }
1419}
1420
1421impl Hash for TeeNode {
1422 fn hash<H: Hasher>(&self, state: &mut H) {
1423 self.0.borrow_mut().hash(state);
1424 }
1425}
1426
1427#[derive(Clone, PartialEq, Eq, Debug)]
1428pub enum BoundKind {
1429 Unbounded,
1430 Bounded,
1431}
1432
1433#[derive(Clone, PartialEq, Eq, Debug)]
1434pub enum StreamOrder {
1435 NoOrder,
1436 TotalOrder,
1437}
1438
1439#[derive(Clone, PartialEq, Eq, Debug)]
1440pub enum StreamRetry {
1441 AtLeastOnce,
1442 ExactlyOnce,
1443}
1444
1445#[derive(Clone, PartialEq, Eq, Debug)]
1446pub enum KeyedSingletonBoundKind {
1447 Unbounded,
1448 BoundedValue,
1449 Bounded,
1450}
1451
1452#[derive(Clone, PartialEq, Eq, Debug)]
1453pub enum CollectionKind {
1454 Stream {
1455 bound: BoundKind,
1456 order: StreamOrder,
1457 retry: StreamRetry,
1458 element_type: DebugType,
1459 },
1460 Singleton {
1461 bound: BoundKind,
1462 element_type: DebugType,
1463 },
1464 Optional {
1465 bound: BoundKind,
1466 element_type: DebugType,
1467 },
1468 KeyedStream {
1469 bound: BoundKind,
1470 value_order: StreamOrder,
1471 value_retry: StreamRetry,
1472 key_type: DebugType,
1473 value_type: DebugType,
1474 },
1475 KeyedSingleton {
1476 bound: KeyedSingletonBoundKind,
1477 key_type: DebugType,
1478 value_type: DebugType,
1479 },
1480}
1481
1482impl CollectionKind {
1483 pub fn is_bounded(&self) -> bool {
1484 matches!(
1485 self,
1486 CollectionKind::Stream {
1487 bound: BoundKind::Bounded,
1488 ..
1489 } | CollectionKind::Singleton {
1490 bound: BoundKind::Bounded,
1491 ..
1492 } | CollectionKind::Optional {
1493 bound: BoundKind::Bounded,
1494 ..
1495 } | CollectionKind::KeyedStream {
1496 bound: BoundKind::Bounded,
1497 ..
1498 } | CollectionKind::KeyedSingleton {
1499 bound: KeyedSingletonBoundKind::Bounded,
1500 ..
1501 }
1502 )
1503 }
1504}
1505
1506#[derive(Clone)]
1507pub struct HydroIrMetadata {
1508 pub location_id: LocationId,
1509 pub collection_kind: CollectionKind,
1510 pub cardinality: Option<usize>,
1511 pub tag: Option<String>,
1512 pub op: HydroIrOpMetadata,
1513}
1514
1515impl Hash for HydroIrMetadata {
1517 fn hash<H: Hasher>(&self, _: &mut H) {}
1518}
1519
1520impl PartialEq for HydroIrMetadata {
1521 fn eq(&self, _: &Self) -> bool {
1522 true
1523 }
1524}
1525
1526impl Eq for HydroIrMetadata {}
1527
1528impl Debug for HydroIrMetadata {
1529 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1530 f.debug_struct("HydroIrMetadata")
1531 .field("location_id", &self.location_id)
1532 .field("collection_kind", &self.collection_kind)
1533 .finish()
1534 }
1535}
1536
1537#[derive(Clone)]
1540pub struct HydroIrOpMetadata {
1541 pub backtrace: Backtrace,
1542 pub cpu_usage: Option<f64>,
1543 pub network_recv_cpu_usage: Option<f64>,
1544 pub id: Option<usize>,
1545}
1546
1547impl HydroIrOpMetadata {
1548 #[expect(
1549 clippy::new_without_default,
1550 reason = "explicit calls to new ensure correct backtrace bounds"
1551 )]
1552 pub fn new() -> HydroIrOpMetadata {
1553 Self::new_with_skip(1)
1554 }
1555
1556 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1557 HydroIrOpMetadata {
1558 backtrace: Backtrace::get_backtrace(2 + skip_count),
1559 cpu_usage: None,
1560 network_recv_cpu_usage: None,
1561 id: None,
1562 }
1563 }
1564}
1565
1566impl Debug for HydroIrOpMetadata {
1567 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1568 f.debug_struct("HydroIrOpMetadata").finish()
1569 }
1570}
1571
1572impl Hash for HydroIrOpMetadata {
1573 fn hash<H: Hasher>(&self, _: &mut H) {}
1574}
1575
1576#[derive(Debug, Hash)]
1579pub enum HydroNode {
1580 Placeholder,
1581
1582 Cast {
1590 inner: Box<HydroNode>,
1591 metadata: HydroIrMetadata,
1592 },
1593
1594 ObserveNonDet {
1600 inner: Box<HydroNode>,
1601 trusted: bool, metadata: HydroIrMetadata,
1603 },
1604
1605 Source {
1606 source: HydroSource,
1607 metadata: HydroIrMetadata,
1608 },
1609
1610 SingletonSource {
1611 value: DebugExpr,
1612 metadata: HydroIrMetadata,
1613 },
1614
1615 CycleSource {
1616 ident: syn::Ident,
1617 metadata: HydroIrMetadata,
1618 },
1619
1620 Tee {
1621 inner: TeeNode,
1622 metadata: HydroIrMetadata,
1623 },
1624
1625 BeginAtomic {
1626 inner: Box<HydroNode>,
1627 metadata: HydroIrMetadata,
1628 },
1629
1630 EndAtomic {
1631 inner: Box<HydroNode>,
1632 metadata: HydroIrMetadata,
1633 },
1634
1635 Batch {
1636 inner: Box<HydroNode>,
1637 metadata: HydroIrMetadata,
1638 },
1639
1640 YieldConcat {
1641 inner: Box<HydroNode>,
1642 metadata: HydroIrMetadata,
1643 },
1644
1645 Chain {
1646 first: Box<HydroNode>,
1647 second: Box<HydroNode>,
1648 metadata: HydroIrMetadata,
1649 },
1650
1651 ChainFirst {
1652 first: Box<HydroNode>,
1653 second: Box<HydroNode>,
1654 metadata: HydroIrMetadata,
1655 },
1656
1657 CrossProduct {
1658 left: Box<HydroNode>,
1659 right: Box<HydroNode>,
1660 metadata: HydroIrMetadata,
1661 },
1662
1663 CrossSingleton {
1664 left: Box<HydroNode>,
1665 right: Box<HydroNode>,
1666 metadata: HydroIrMetadata,
1667 },
1668
1669 Join {
1670 left: Box<HydroNode>,
1671 right: Box<HydroNode>,
1672 metadata: HydroIrMetadata,
1673 },
1674
1675 Difference {
1676 pos: Box<HydroNode>,
1677 neg: Box<HydroNode>,
1678 metadata: HydroIrMetadata,
1679 },
1680
1681 AntiJoin {
1682 pos: Box<HydroNode>,
1683 neg: Box<HydroNode>,
1684 metadata: HydroIrMetadata,
1685 },
1686
1687 ResolveFutures {
1688 input: Box<HydroNode>,
1689 metadata: HydroIrMetadata,
1690 },
1691 ResolveFuturesOrdered {
1692 input: Box<HydroNode>,
1693 metadata: HydroIrMetadata,
1694 },
1695
1696 Map {
1697 f: DebugExpr,
1698 input: Box<HydroNode>,
1699 metadata: HydroIrMetadata,
1700 },
1701 FlatMap {
1702 f: DebugExpr,
1703 input: Box<HydroNode>,
1704 metadata: HydroIrMetadata,
1705 },
1706 Filter {
1707 f: DebugExpr,
1708 input: Box<HydroNode>,
1709 metadata: HydroIrMetadata,
1710 },
1711 FilterMap {
1712 f: DebugExpr,
1713 input: Box<HydroNode>,
1714 metadata: HydroIrMetadata,
1715 },
1716
1717 DeferTick {
1718 input: Box<HydroNode>,
1719 metadata: HydroIrMetadata,
1720 },
1721 Enumerate {
1722 input: Box<HydroNode>,
1723 metadata: HydroIrMetadata,
1724 },
1725 Inspect {
1726 f: DebugExpr,
1727 input: Box<HydroNode>,
1728 metadata: HydroIrMetadata,
1729 },
1730
1731 Unique {
1732 input: Box<HydroNode>,
1733 metadata: HydroIrMetadata,
1734 },
1735
1736 Sort {
1737 input: Box<HydroNode>,
1738 metadata: HydroIrMetadata,
1739 },
1740 Fold {
1741 init: DebugExpr,
1742 acc: DebugExpr,
1743 input: Box<HydroNode>,
1744 metadata: HydroIrMetadata,
1745 },
1746
1747 Scan {
1748 init: DebugExpr,
1749 acc: DebugExpr,
1750 input: Box<HydroNode>,
1751 metadata: HydroIrMetadata,
1752 },
1753 FoldKeyed {
1754 init: DebugExpr,
1755 acc: DebugExpr,
1756 input: Box<HydroNode>,
1757 metadata: HydroIrMetadata,
1758 },
1759
1760 Reduce {
1761 f: DebugExpr,
1762 input: Box<HydroNode>,
1763 metadata: HydroIrMetadata,
1764 },
1765 ReduceKeyed {
1766 f: DebugExpr,
1767 input: Box<HydroNode>,
1768 metadata: HydroIrMetadata,
1769 },
1770 ReduceKeyedWatermark {
1771 f: DebugExpr,
1772 input: Box<HydroNode>,
1773 watermark: Box<HydroNode>,
1774 metadata: HydroIrMetadata,
1775 },
1776
1777 Network {
1778 name: Option<String>,
1779 serialize_fn: Option<DebugExpr>,
1780 instantiate_fn: DebugInstantiate,
1781 deserialize_fn: Option<DebugExpr>,
1782 input: Box<HydroNode>,
1783 metadata: HydroIrMetadata,
1784 },
1785
1786 ExternalInput {
1787 from_external_key: LocationKey,
1788 from_port_id: ExternalPortId,
1789 from_many: bool,
1790 codec_type: DebugType,
1791 port_hint: NetworkHint,
1792 instantiate_fn: DebugInstantiate,
1793 deserialize_fn: Option<DebugExpr>,
1794 metadata: HydroIrMetadata,
1795 },
1796
1797 Counter {
1798 tag: String,
1799 duration: DebugExpr,
1800 prefix: String,
1801 input: Box<HydroNode>,
1802 metadata: HydroIrMetadata,
1803 },
1804}
1805
1806pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1807pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1808
1809impl HydroNode {
1810 pub fn transform_bottom_up(
1811 &mut self,
1812 transform: &mut impl FnMut(&mut HydroNode),
1813 seen_tees: &mut SeenTees,
1814 check_well_formed: bool,
1815 ) {
1816 self.transform_children(
1817 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1818 seen_tees,
1819 );
1820
1821 transform(self);
1822
1823 let self_location = self.metadata().location_id.root();
1824
1825 if check_well_formed {
1826 match &*self {
1827 HydroNode::Network { .. } => {}
1828 _ => {
1829 self.input_metadata().iter().for_each(|i| {
1830 if i.location_id.root() != self_location {
1831 panic!(
1832 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1833 i,
1834 i.location_id.root(),
1835 self,
1836 self_location
1837 )
1838 }
1839 });
1840 }
1841 }
1842 }
1843 }
1844
1845 #[inline(always)]
1846 pub fn transform_children(
1847 &mut self,
1848 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1849 seen_tees: &mut SeenTees,
1850 ) {
1851 match self {
1852 HydroNode::Placeholder => {
1853 panic!();
1854 }
1855
1856 HydroNode::Source { .. }
1857 | HydroNode::SingletonSource { .. }
1858 | HydroNode::CycleSource { .. }
1859 | HydroNode::ExternalInput { .. } => {}
1860
1861 HydroNode::Tee { inner, .. } => {
1862 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1863 *inner = TeeNode(transformed.clone());
1864 } else {
1865 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1866 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1867 let mut orig = inner.0.replace(HydroNode::Placeholder);
1868 transform(&mut orig, seen_tees);
1869 *transformed_cell.borrow_mut() = orig;
1870 *inner = TeeNode(transformed_cell);
1871 }
1872 }
1873
1874 HydroNode::Cast { inner, .. }
1875 | HydroNode::ObserveNonDet { inner, .. }
1876 | HydroNode::BeginAtomic { inner, .. }
1877 | HydroNode::EndAtomic { inner, .. }
1878 | HydroNode::Batch { inner, .. }
1879 | HydroNode::YieldConcat { inner, .. } => {
1880 transform(inner.as_mut(), seen_tees);
1881 }
1882
1883 HydroNode::Chain { first, second, .. } => {
1884 transform(first.as_mut(), seen_tees);
1885 transform(second.as_mut(), seen_tees);
1886 }
1887
1888 HydroNode::ChainFirst { first, second, .. } => {
1889 transform(first.as_mut(), seen_tees);
1890 transform(second.as_mut(), seen_tees);
1891 }
1892
1893 HydroNode::CrossSingleton { left, right, .. }
1894 | HydroNode::CrossProduct { left, right, .. }
1895 | HydroNode::Join { left, right, .. } => {
1896 transform(left.as_mut(), seen_tees);
1897 transform(right.as_mut(), seen_tees);
1898 }
1899
1900 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1901 transform(pos.as_mut(), seen_tees);
1902 transform(neg.as_mut(), seen_tees);
1903 }
1904
1905 HydroNode::ReduceKeyedWatermark {
1906 input, watermark, ..
1907 } => {
1908 transform(input.as_mut(), seen_tees);
1909 transform(watermark.as_mut(), seen_tees);
1910 }
1911
1912 HydroNode::Map { input, .. }
1913 | HydroNode::ResolveFutures { input, .. }
1914 | HydroNode::ResolveFuturesOrdered { input, .. }
1915 | HydroNode::FlatMap { input, .. }
1916 | HydroNode::Filter { input, .. }
1917 | HydroNode::FilterMap { input, .. }
1918 | HydroNode::Sort { input, .. }
1919 | HydroNode::DeferTick { input, .. }
1920 | HydroNode::Enumerate { input, .. }
1921 | HydroNode::Inspect { input, .. }
1922 | HydroNode::Unique { input, .. }
1923 | HydroNode::Network { input, .. }
1924 | HydroNode::Fold { input, .. }
1925 | HydroNode::Scan { input, .. }
1926 | HydroNode::FoldKeyed { input, .. }
1927 | HydroNode::Reduce { input, .. }
1928 | HydroNode::ReduceKeyed { input, .. }
1929 | HydroNode::Counter { input, .. } => {
1930 transform(input.as_mut(), seen_tees);
1931 }
1932 }
1933 }
1934
1935 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1936 match self {
1937 HydroNode::Placeholder => HydroNode::Placeholder,
1938 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1939 inner: Box::new(inner.deep_clone(seen_tees)),
1940 metadata: metadata.clone(),
1941 },
1942 HydroNode::ObserveNonDet {
1943 inner,
1944 trusted,
1945 metadata,
1946 } => HydroNode::ObserveNonDet {
1947 inner: Box::new(inner.deep_clone(seen_tees)),
1948 trusted: *trusted,
1949 metadata: metadata.clone(),
1950 },
1951 HydroNode::Source { source, metadata } => HydroNode::Source {
1952 source: source.clone(),
1953 metadata: metadata.clone(),
1954 },
1955 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1956 value: value.clone(),
1957 metadata: metadata.clone(),
1958 },
1959 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1960 ident: ident.clone(),
1961 metadata: metadata.clone(),
1962 },
1963 HydroNode::Tee { inner, metadata } => {
1964 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1965 HydroNode::Tee {
1966 inner: TeeNode(transformed.clone()),
1967 metadata: metadata.clone(),
1968 }
1969 } else {
1970 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1971 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1972 let cloned = inner.0.borrow().deep_clone(seen_tees);
1973 *new_rc.borrow_mut() = cloned;
1974 HydroNode::Tee {
1975 inner: TeeNode(new_rc),
1976 metadata: metadata.clone(),
1977 }
1978 }
1979 }
1980 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1981 inner: Box::new(inner.deep_clone(seen_tees)),
1982 metadata: metadata.clone(),
1983 },
1984 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1985 inner: Box::new(inner.deep_clone(seen_tees)),
1986 metadata: metadata.clone(),
1987 },
1988 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1989 inner: Box::new(inner.deep_clone(seen_tees)),
1990 metadata: metadata.clone(),
1991 },
1992 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1993 inner: Box::new(inner.deep_clone(seen_tees)),
1994 metadata: metadata.clone(),
1995 },
1996 HydroNode::Chain {
1997 first,
1998 second,
1999 metadata,
2000 } => HydroNode::Chain {
2001 first: Box::new(first.deep_clone(seen_tees)),
2002 second: Box::new(second.deep_clone(seen_tees)),
2003 metadata: metadata.clone(),
2004 },
2005 HydroNode::ChainFirst {
2006 first,
2007 second,
2008 metadata,
2009 } => HydroNode::ChainFirst {
2010 first: Box::new(first.deep_clone(seen_tees)),
2011 second: Box::new(second.deep_clone(seen_tees)),
2012 metadata: metadata.clone(),
2013 },
2014 HydroNode::CrossProduct {
2015 left,
2016 right,
2017 metadata,
2018 } => HydroNode::CrossProduct {
2019 left: Box::new(left.deep_clone(seen_tees)),
2020 right: Box::new(right.deep_clone(seen_tees)),
2021 metadata: metadata.clone(),
2022 },
2023 HydroNode::CrossSingleton {
2024 left,
2025 right,
2026 metadata,
2027 } => HydroNode::CrossSingleton {
2028 left: Box::new(left.deep_clone(seen_tees)),
2029 right: Box::new(right.deep_clone(seen_tees)),
2030 metadata: metadata.clone(),
2031 },
2032 HydroNode::Join {
2033 left,
2034 right,
2035 metadata,
2036 } => HydroNode::Join {
2037 left: Box::new(left.deep_clone(seen_tees)),
2038 right: Box::new(right.deep_clone(seen_tees)),
2039 metadata: metadata.clone(),
2040 },
2041 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2042 pos: Box::new(pos.deep_clone(seen_tees)),
2043 neg: Box::new(neg.deep_clone(seen_tees)),
2044 metadata: metadata.clone(),
2045 },
2046 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2047 pos: Box::new(pos.deep_clone(seen_tees)),
2048 neg: Box::new(neg.deep_clone(seen_tees)),
2049 metadata: metadata.clone(),
2050 },
2051 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2052 input: Box::new(input.deep_clone(seen_tees)),
2053 metadata: metadata.clone(),
2054 },
2055 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2056 HydroNode::ResolveFuturesOrdered {
2057 input: Box::new(input.deep_clone(seen_tees)),
2058 metadata: metadata.clone(),
2059 }
2060 }
2061 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2062 f: f.clone(),
2063 input: Box::new(input.deep_clone(seen_tees)),
2064 metadata: metadata.clone(),
2065 },
2066 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2067 f: f.clone(),
2068 input: Box::new(input.deep_clone(seen_tees)),
2069 metadata: metadata.clone(),
2070 },
2071 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2072 f: f.clone(),
2073 input: Box::new(input.deep_clone(seen_tees)),
2074 metadata: metadata.clone(),
2075 },
2076 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2077 f: f.clone(),
2078 input: Box::new(input.deep_clone(seen_tees)),
2079 metadata: metadata.clone(),
2080 },
2081 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2082 input: Box::new(input.deep_clone(seen_tees)),
2083 metadata: metadata.clone(),
2084 },
2085 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2086 input: Box::new(input.deep_clone(seen_tees)),
2087 metadata: metadata.clone(),
2088 },
2089 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2090 f: f.clone(),
2091 input: Box::new(input.deep_clone(seen_tees)),
2092 metadata: metadata.clone(),
2093 },
2094 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2095 input: Box::new(input.deep_clone(seen_tees)),
2096 metadata: metadata.clone(),
2097 },
2098 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2099 input: Box::new(input.deep_clone(seen_tees)),
2100 metadata: metadata.clone(),
2101 },
2102 HydroNode::Fold {
2103 init,
2104 acc,
2105 input,
2106 metadata,
2107 } => HydroNode::Fold {
2108 init: init.clone(),
2109 acc: acc.clone(),
2110 input: Box::new(input.deep_clone(seen_tees)),
2111 metadata: metadata.clone(),
2112 },
2113 HydroNode::Scan {
2114 init,
2115 acc,
2116 input,
2117 metadata,
2118 } => HydroNode::Scan {
2119 init: init.clone(),
2120 acc: acc.clone(),
2121 input: Box::new(input.deep_clone(seen_tees)),
2122 metadata: metadata.clone(),
2123 },
2124 HydroNode::FoldKeyed {
2125 init,
2126 acc,
2127 input,
2128 metadata,
2129 } => HydroNode::FoldKeyed {
2130 init: init.clone(),
2131 acc: acc.clone(),
2132 input: Box::new(input.deep_clone(seen_tees)),
2133 metadata: metadata.clone(),
2134 },
2135 HydroNode::ReduceKeyedWatermark {
2136 f,
2137 input,
2138 watermark,
2139 metadata,
2140 } => HydroNode::ReduceKeyedWatermark {
2141 f: f.clone(),
2142 input: Box::new(input.deep_clone(seen_tees)),
2143 watermark: Box::new(watermark.deep_clone(seen_tees)),
2144 metadata: metadata.clone(),
2145 },
2146 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2147 f: f.clone(),
2148 input: Box::new(input.deep_clone(seen_tees)),
2149 metadata: metadata.clone(),
2150 },
2151 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2152 f: f.clone(),
2153 input: Box::new(input.deep_clone(seen_tees)),
2154 metadata: metadata.clone(),
2155 },
2156 HydroNode::Network {
2157 name,
2158 serialize_fn,
2159 instantiate_fn,
2160 deserialize_fn,
2161 input,
2162 metadata,
2163 } => HydroNode::Network {
2164 name: name.clone(),
2165 serialize_fn: serialize_fn.clone(),
2166 instantiate_fn: instantiate_fn.clone(),
2167 deserialize_fn: deserialize_fn.clone(),
2168 input: Box::new(input.deep_clone(seen_tees)),
2169 metadata: metadata.clone(),
2170 },
2171 HydroNode::ExternalInput {
2172 from_external_key,
2173 from_port_id,
2174 from_many,
2175 codec_type,
2176 port_hint,
2177 instantiate_fn,
2178 deserialize_fn,
2179 metadata,
2180 } => HydroNode::ExternalInput {
2181 from_external_key: *from_external_key,
2182 from_port_id: *from_port_id,
2183 from_many: *from_many,
2184 codec_type: codec_type.clone(),
2185 port_hint: *port_hint,
2186 instantiate_fn: instantiate_fn.clone(),
2187 deserialize_fn: deserialize_fn.clone(),
2188 metadata: metadata.clone(),
2189 },
2190 HydroNode::Counter {
2191 tag,
2192 duration,
2193 prefix,
2194 input,
2195 metadata,
2196 } => HydroNode::Counter {
2197 tag: tag.clone(),
2198 duration: duration.clone(),
2199 prefix: prefix.clone(),
2200 input: Box::new(input.deep_clone(seen_tees)),
2201 metadata: metadata.clone(),
2202 },
2203 }
2204 }
2205
2206 #[cfg(feature = "build")]
2207 pub fn emit_core<'a, D: Deploy<'a>>(
2208 &mut self,
2209 builders_or_callback: &mut BuildersOrCallback<
2210 impl FnMut(&mut HydroRoot, &mut usize),
2211 impl FnMut(&mut HydroNode, &mut usize),
2212 >,
2213 seen_tees: &mut SeenTees,
2214 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2215 next_stmt_id: &mut usize,
2216 ) -> syn::Ident {
2217 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2218
2219 self.transform_bottom_up(
2220 &mut |node: &mut HydroNode| {
2221 let out_location = node.metadata().location_id.clone();
2222 match node {
2223 HydroNode::Placeholder => {
2224 panic!()
2225 }
2226
2227 HydroNode::Cast { .. } => {
2228 match builders_or_callback {
2231 BuildersOrCallback::Builders(_) => {}
2232 BuildersOrCallback::Callback(_, node_callback) => {
2233 node_callback(node, next_stmt_id);
2234 }
2235 }
2236
2237 *next_stmt_id += 1;
2238 }
2240
2241 HydroNode::ObserveNonDet {
2242 inner,
2243 trusted,
2244 metadata,
2245 ..
2246 } => {
2247 let inner_ident = ident_stack.pop().unwrap();
2248
2249 let observe_ident =
2250 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2251
2252 match builders_or_callback {
2253 BuildersOrCallback::Builders(graph_builders) => {
2254 graph_builders.observe_nondet(
2255 *trusted,
2256 &inner.metadata().location_id,
2257 inner_ident,
2258 &inner.metadata().collection_kind,
2259 &observe_ident,
2260 &metadata.collection_kind,
2261 &metadata.op,
2262 );
2263 }
2264 BuildersOrCallback::Callback(_, node_callback) => {
2265 node_callback(node, next_stmt_id);
2266 }
2267 }
2268
2269 *next_stmt_id += 1;
2270
2271 ident_stack.push(observe_ident);
2272 }
2273
2274 HydroNode::Batch {
2275 inner, metadata, ..
2276 } => {
2277 let inner_ident = ident_stack.pop().unwrap();
2278
2279 let batch_ident =
2280 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2281
2282 match builders_or_callback {
2283 BuildersOrCallback::Builders(graph_builders) => {
2284 graph_builders.batch(
2285 inner_ident,
2286 &inner.metadata().location_id,
2287 &inner.metadata().collection_kind,
2288 &batch_ident,
2289 &out_location,
2290 &metadata.op,
2291 );
2292 }
2293 BuildersOrCallback::Callback(_, node_callback) => {
2294 node_callback(node, next_stmt_id);
2295 }
2296 }
2297
2298 *next_stmt_id += 1;
2299
2300 ident_stack.push(batch_ident);
2301 }
2302
2303 HydroNode::YieldConcat { inner, .. } => {
2304 let inner_ident = ident_stack.pop().unwrap();
2305
2306 let yield_ident =
2307 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2308
2309 match builders_or_callback {
2310 BuildersOrCallback::Builders(graph_builders) => {
2311 graph_builders.yield_from_tick(
2312 inner_ident,
2313 &inner.metadata().location_id,
2314 &inner.metadata().collection_kind,
2315 &yield_ident,
2316 &out_location,
2317 );
2318 }
2319 BuildersOrCallback::Callback(_, node_callback) => {
2320 node_callback(node, next_stmt_id);
2321 }
2322 }
2323
2324 *next_stmt_id += 1;
2325
2326 ident_stack.push(yield_ident);
2327 }
2328
2329 HydroNode::BeginAtomic { inner, metadata } => {
2330 let inner_ident = ident_stack.pop().unwrap();
2331
2332 let begin_ident =
2333 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2334
2335 match builders_or_callback {
2336 BuildersOrCallback::Builders(graph_builders) => {
2337 graph_builders.begin_atomic(
2338 inner_ident,
2339 &inner.metadata().location_id,
2340 &inner.metadata().collection_kind,
2341 &begin_ident,
2342 &out_location,
2343 &metadata.op,
2344 );
2345 }
2346 BuildersOrCallback::Callback(_, node_callback) => {
2347 node_callback(node, next_stmt_id);
2348 }
2349 }
2350
2351 *next_stmt_id += 1;
2352
2353 ident_stack.push(begin_ident);
2354 }
2355
2356 HydroNode::EndAtomic { inner, .. } => {
2357 let inner_ident = ident_stack.pop().unwrap();
2358
2359 let end_ident =
2360 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2361
2362 match builders_or_callback {
2363 BuildersOrCallback::Builders(graph_builders) => {
2364 graph_builders.end_atomic(
2365 inner_ident,
2366 &inner.metadata().location_id,
2367 &inner.metadata().collection_kind,
2368 &end_ident,
2369 );
2370 }
2371 BuildersOrCallback::Callback(_, node_callback) => {
2372 node_callback(node, next_stmt_id);
2373 }
2374 }
2375
2376 *next_stmt_id += 1;
2377
2378 ident_stack.push(end_ident);
2379 }
2380
2381 HydroNode::Source {
2382 source, metadata, ..
2383 } => {
2384 if let HydroSource::ExternalNetwork() = source {
2385 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2386 } else {
2387 let source_ident =
2388 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2389
2390 let source_stmt = match source {
2391 HydroSource::Stream(expr) => {
2392 debug_assert!(metadata.location_id.is_top_level());
2393 parse_quote! {
2394 #source_ident = source_stream(#expr);
2395 }
2396 }
2397
2398 HydroSource::ExternalNetwork() => {
2399 unreachable!()
2400 }
2401
2402 HydroSource::Iter(expr) => {
2403 if metadata.location_id.is_top_level() {
2404 parse_quote! {
2405 #source_ident = source_iter(#expr);
2406 }
2407 } else {
2408 parse_quote! {
2410 #source_ident = source_iter(#expr) -> persist::<'static>();
2411 }
2412 }
2413 }
2414
2415 HydroSource::Spin() => {
2416 debug_assert!(metadata.location_id.is_top_level());
2417 parse_quote! {
2418 #source_ident = spin();
2419 }
2420 }
2421
2422 HydroSource::ClusterMembers(location_id) => {
2423 debug_assert!(metadata.location_id.is_top_level());
2424
2425 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2426 D::cluster_membership_stream(location_id),
2427 &(),
2428 );
2429
2430 parse_quote! {
2431 #source_ident = source_stream(#expr);
2432 }
2433 }
2434
2435 HydroSource::Embedded(ident) => {
2436 parse_quote! {
2437 #source_ident = source_stream(#ident);
2438 }
2439 }
2440 };
2441
2442 match builders_or_callback {
2443 BuildersOrCallback::Builders(graph_builders) => {
2444 let builder = graph_builders.get_dfir_mut(&out_location);
2445 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2446 }
2447 BuildersOrCallback::Callback(_, node_callback) => {
2448 node_callback(node, next_stmt_id);
2449 }
2450 }
2451
2452 *next_stmt_id += 1;
2453
2454 ident_stack.push(source_ident);
2455 }
2456 }
2457
2458 HydroNode::SingletonSource { value, metadata } => {
2459 let source_ident =
2460 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2461
2462 match builders_or_callback {
2463 BuildersOrCallback::Builders(graph_builders) => {
2464 let builder = graph_builders.get_dfir_mut(&out_location);
2465
2466 if metadata.location_id.is_top_level()
2467 && metadata.collection_kind.is_bounded()
2468 {
2469 builder.add_dfir(
2470 parse_quote! {
2471 #source_ident = source_iter([#value]);
2472 },
2473 None,
2474 Some(&next_stmt_id.to_string()),
2475 );
2476 } else {
2477 builder.add_dfir(
2478 parse_quote! {
2479 #source_ident = source_iter([#value]) -> persist::<'static>();
2480 },
2481 None,
2482 Some(&next_stmt_id.to_string()),
2483 );
2484 }
2485 }
2486 BuildersOrCallback::Callback(_, node_callback) => {
2487 node_callback(node, next_stmt_id);
2488 }
2489 }
2490
2491 *next_stmt_id += 1;
2492
2493 ident_stack.push(source_ident);
2494 }
2495
2496 HydroNode::CycleSource { ident, .. } => {
2497 let ident = ident.clone();
2498
2499 match builders_or_callback {
2500 BuildersOrCallback::Builders(_) => {}
2501 BuildersOrCallback::Callback(_, node_callback) => {
2502 node_callback(node, next_stmt_id);
2503 }
2504 }
2505
2506 *next_stmt_id += 1;
2508
2509 ident_stack.push(ident);
2510 }
2511
2512 HydroNode::Tee { inner, .. } => {
2513 let ret_ident = if let Some(teed_from) =
2514 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2515 {
2516 match builders_or_callback {
2517 BuildersOrCallback::Builders(_) => {}
2518 BuildersOrCallback::Callback(_, node_callback) => {
2519 node_callback(node, next_stmt_id);
2520 }
2521 }
2522
2523 teed_from.clone()
2524 } else {
2525 let inner_ident = ident_stack.pop().unwrap();
2528
2529 let tee_ident =
2530 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2531
2532 built_tees.insert(
2533 inner.0.as_ref() as *const RefCell<HydroNode>,
2534 tee_ident.clone(),
2535 );
2536
2537 match builders_or_callback {
2538 BuildersOrCallback::Builders(graph_builders) => {
2539 let builder = graph_builders.get_dfir_mut(&out_location);
2540 builder.add_dfir(
2541 parse_quote! {
2542 #tee_ident = #inner_ident -> tee();
2543 },
2544 None,
2545 Some(&next_stmt_id.to_string()),
2546 );
2547 }
2548 BuildersOrCallback::Callback(_, node_callback) => {
2549 node_callback(node, next_stmt_id);
2550 }
2551 }
2552
2553 tee_ident
2554 };
2555
2556 *next_stmt_id += 1;
2560 ident_stack.push(ret_ident);
2561 }
2562
2563 HydroNode::Chain { .. } => {
2564 let second_ident = ident_stack.pop().unwrap();
2566 let first_ident = ident_stack.pop().unwrap();
2567
2568 let chain_ident =
2569 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2570
2571 match builders_or_callback {
2572 BuildersOrCallback::Builders(graph_builders) => {
2573 let builder = graph_builders.get_dfir_mut(&out_location);
2574 builder.add_dfir(
2575 parse_quote! {
2576 #chain_ident = chain();
2577 #first_ident -> [0]#chain_ident;
2578 #second_ident -> [1]#chain_ident;
2579 },
2580 None,
2581 Some(&next_stmt_id.to_string()),
2582 );
2583 }
2584 BuildersOrCallback::Callback(_, node_callback) => {
2585 node_callback(node, next_stmt_id);
2586 }
2587 }
2588
2589 *next_stmt_id += 1;
2590
2591 ident_stack.push(chain_ident);
2592 }
2593
2594 HydroNode::ChainFirst { .. } => {
2595 let second_ident = ident_stack.pop().unwrap();
2596 let first_ident = ident_stack.pop().unwrap();
2597
2598 let chain_ident =
2599 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2600
2601 match builders_or_callback {
2602 BuildersOrCallback::Builders(graph_builders) => {
2603 let builder = graph_builders.get_dfir_mut(&out_location);
2604 builder.add_dfir(
2605 parse_quote! {
2606 #chain_ident = chain_first_n(1);
2607 #first_ident -> [0]#chain_ident;
2608 #second_ident -> [1]#chain_ident;
2609 },
2610 None,
2611 Some(&next_stmt_id.to_string()),
2612 );
2613 }
2614 BuildersOrCallback::Callback(_, node_callback) => {
2615 node_callback(node, next_stmt_id);
2616 }
2617 }
2618
2619 *next_stmt_id += 1;
2620
2621 ident_stack.push(chain_ident);
2622 }
2623
2624 HydroNode::CrossSingleton { right, .. } => {
2625 let right_ident = ident_stack.pop().unwrap();
2626 let left_ident = ident_stack.pop().unwrap();
2627
2628 let cross_ident =
2629 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2630
2631 match builders_or_callback {
2632 BuildersOrCallback::Builders(graph_builders) => {
2633 let builder = graph_builders.get_dfir_mut(&out_location);
2634
2635 if right.metadata().location_id.is_top_level()
2636 && right.metadata().collection_kind.is_bounded()
2637 {
2638 builder.add_dfir(
2639 parse_quote! {
2640 #cross_ident = cross_singleton();
2641 #left_ident -> [input]#cross_ident;
2642 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2643 },
2644 None,
2645 Some(&next_stmt_id.to_string()),
2646 );
2647 } else {
2648 builder.add_dfir(
2649 parse_quote! {
2650 #cross_ident = cross_singleton();
2651 #left_ident -> [input]#cross_ident;
2652 #right_ident -> [single]#cross_ident;
2653 },
2654 None,
2655 Some(&next_stmt_id.to_string()),
2656 );
2657 }
2658 }
2659 BuildersOrCallback::Callback(_, node_callback) => {
2660 node_callback(node, next_stmt_id);
2661 }
2662 }
2663
2664 *next_stmt_id += 1;
2665
2666 ident_stack.push(cross_ident);
2667 }
2668
2669 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2670 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2671 parse_quote!(cross_join_multiset)
2672 } else {
2673 parse_quote!(join_multiset)
2674 };
2675
2676 let (HydroNode::CrossProduct { left, right, .. }
2677 | HydroNode::Join { left, right, .. }) = node
2678 else {
2679 unreachable!()
2680 };
2681
2682 let is_top_level = left.metadata().location_id.is_top_level()
2683 && right.metadata().location_id.is_top_level();
2684 let left_lifetime = if left.metadata().location_id.is_top_level() {
2685 quote!('static)
2686 } else {
2687 quote!('tick)
2688 };
2689
2690 let right_lifetime = if right.metadata().location_id.is_top_level() {
2691 quote!('static)
2692 } else {
2693 quote!('tick)
2694 };
2695
2696 let right_ident = ident_stack.pop().unwrap();
2697 let left_ident = ident_stack.pop().unwrap();
2698
2699 let stream_ident =
2700 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2701
2702 match builders_or_callback {
2703 BuildersOrCallback::Builders(graph_builders) => {
2704 let builder = graph_builders.get_dfir_mut(&out_location);
2705 builder.add_dfir(
2706 if is_top_level {
2707 parse_quote! {
2710 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2711 #left_ident -> [0]#stream_ident;
2712 #right_ident -> [1]#stream_ident;
2713 }
2714 } else {
2715 parse_quote! {
2716 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2717 #left_ident -> [0]#stream_ident;
2718 #right_ident -> [1]#stream_ident;
2719 }
2720 }
2721 ,
2722 None,
2723 Some(&next_stmt_id.to_string()),
2724 );
2725 }
2726 BuildersOrCallback::Callback(_, node_callback) => {
2727 node_callback(node, next_stmt_id);
2728 }
2729 }
2730
2731 *next_stmt_id += 1;
2732
2733 ident_stack.push(stream_ident);
2734 }
2735
2736 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2737 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2738 parse_quote!(difference)
2739 } else {
2740 parse_quote!(anti_join)
2741 };
2742
2743 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2744 node
2745 else {
2746 unreachable!()
2747 };
2748
2749 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2750 quote!('static)
2751 } else {
2752 quote!('tick)
2753 };
2754
2755 let neg_ident = ident_stack.pop().unwrap();
2756 let pos_ident = ident_stack.pop().unwrap();
2757
2758 let stream_ident =
2759 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2760
2761 match builders_or_callback {
2762 BuildersOrCallback::Builders(graph_builders) => {
2763 let builder = graph_builders.get_dfir_mut(&out_location);
2764 builder.add_dfir(
2765 parse_quote! {
2766 #stream_ident = #operator::<'tick, #neg_lifetime>();
2767 #pos_ident -> [pos]#stream_ident;
2768 #neg_ident -> [neg]#stream_ident;
2769 },
2770 None,
2771 Some(&next_stmt_id.to_string()),
2772 );
2773 }
2774 BuildersOrCallback::Callback(_, node_callback) => {
2775 node_callback(node, next_stmt_id);
2776 }
2777 }
2778
2779 *next_stmt_id += 1;
2780
2781 ident_stack.push(stream_ident);
2782 }
2783
2784 HydroNode::ResolveFutures { .. } => {
2785 let input_ident = ident_stack.pop().unwrap();
2786
2787 let futures_ident =
2788 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2789
2790 match builders_or_callback {
2791 BuildersOrCallback::Builders(graph_builders) => {
2792 let builder = graph_builders.get_dfir_mut(&out_location);
2793 builder.add_dfir(
2794 parse_quote! {
2795 #futures_ident = #input_ident -> resolve_futures();
2796 },
2797 None,
2798 Some(&next_stmt_id.to_string()),
2799 );
2800 }
2801 BuildersOrCallback::Callback(_, node_callback) => {
2802 node_callback(node, next_stmt_id);
2803 }
2804 }
2805
2806 *next_stmt_id += 1;
2807
2808 ident_stack.push(futures_ident);
2809 }
2810
2811 HydroNode::ResolveFuturesOrdered { .. } => {
2812 let input_ident = ident_stack.pop().unwrap();
2813
2814 let futures_ident =
2815 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2816
2817 match builders_or_callback {
2818 BuildersOrCallback::Builders(graph_builders) => {
2819 let builder = graph_builders.get_dfir_mut(&out_location);
2820 builder.add_dfir(
2821 parse_quote! {
2822 #futures_ident = #input_ident -> resolve_futures_ordered();
2823 },
2824 None,
2825 Some(&next_stmt_id.to_string()),
2826 );
2827 }
2828 BuildersOrCallback::Callback(_, node_callback) => {
2829 node_callback(node, next_stmt_id);
2830 }
2831 }
2832
2833 *next_stmt_id += 1;
2834
2835 ident_stack.push(futures_ident);
2836 }
2837
2838 HydroNode::Map { f, .. } => {
2839 let input_ident = ident_stack.pop().unwrap();
2840
2841 let map_ident =
2842 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2843
2844 match builders_or_callback {
2845 BuildersOrCallback::Builders(graph_builders) => {
2846 let builder = graph_builders.get_dfir_mut(&out_location);
2847 builder.add_dfir(
2848 parse_quote! {
2849 #map_ident = #input_ident -> map(#f);
2850 },
2851 None,
2852 Some(&next_stmt_id.to_string()),
2853 );
2854 }
2855 BuildersOrCallback::Callback(_, node_callback) => {
2856 node_callback(node, next_stmt_id);
2857 }
2858 }
2859
2860 *next_stmt_id += 1;
2861
2862 ident_stack.push(map_ident);
2863 }
2864
2865 HydroNode::FlatMap { f, .. } => {
2866 let input_ident = ident_stack.pop().unwrap();
2867
2868 let flat_map_ident =
2869 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2870
2871 match builders_or_callback {
2872 BuildersOrCallback::Builders(graph_builders) => {
2873 let builder = graph_builders.get_dfir_mut(&out_location);
2874 builder.add_dfir(
2875 parse_quote! {
2876 #flat_map_ident = #input_ident -> flat_map(#f);
2877 },
2878 None,
2879 Some(&next_stmt_id.to_string()),
2880 );
2881 }
2882 BuildersOrCallback::Callback(_, node_callback) => {
2883 node_callback(node, next_stmt_id);
2884 }
2885 }
2886
2887 *next_stmt_id += 1;
2888
2889 ident_stack.push(flat_map_ident);
2890 }
2891
2892 HydroNode::Filter { f, .. } => {
2893 let input_ident = ident_stack.pop().unwrap();
2894
2895 let filter_ident =
2896 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2897
2898 match builders_or_callback {
2899 BuildersOrCallback::Builders(graph_builders) => {
2900 let builder = graph_builders.get_dfir_mut(&out_location);
2901 builder.add_dfir(
2902 parse_quote! {
2903 #filter_ident = #input_ident -> filter(#f);
2904 },
2905 None,
2906 Some(&next_stmt_id.to_string()),
2907 );
2908 }
2909 BuildersOrCallback::Callback(_, node_callback) => {
2910 node_callback(node, next_stmt_id);
2911 }
2912 }
2913
2914 *next_stmt_id += 1;
2915
2916 ident_stack.push(filter_ident);
2917 }
2918
2919 HydroNode::FilterMap { f, .. } => {
2920 let input_ident = ident_stack.pop().unwrap();
2921
2922 let filter_map_ident =
2923 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2924
2925 match builders_or_callback {
2926 BuildersOrCallback::Builders(graph_builders) => {
2927 let builder = graph_builders.get_dfir_mut(&out_location);
2928 builder.add_dfir(
2929 parse_quote! {
2930 #filter_map_ident = #input_ident -> filter_map(#f);
2931 },
2932 None,
2933 Some(&next_stmt_id.to_string()),
2934 );
2935 }
2936 BuildersOrCallback::Callback(_, node_callback) => {
2937 node_callback(node, next_stmt_id);
2938 }
2939 }
2940
2941 *next_stmt_id += 1;
2942
2943 ident_stack.push(filter_map_ident);
2944 }
2945
2946 HydroNode::Sort { .. } => {
2947 let input_ident = ident_stack.pop().unwrap();
2948
2949 let sort_ident =
2950 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2951
2952 match builders_or_callback {
2953 BuildersOrCallback::Builders(graph_builders) => {
2954 let builder = graph_builders.get_dfir_mut(&out_location);
2955 builder.add_dfir(
2956 parse_quote! {
2957 #sort_ident = #input_ident -> sort();
2958 },
2959 None,
2960 Some(&next_stmt_id.to_string()),
2961 );
2962 }
2963 BuildersOrCallback::Callback(_, node_callback) => {
2964 node_callback(node, next_stmt_id);
2965 }
2966 }
2967
2968 *next_stmt_id += 1;
2969
2970 ident_stack.push(sort_ident);
2971 }
2972
2973 HydroNode::DeferTick { .. } => {
2974 let input_ident = ident_stack.pop().unwrap();
2975
2976 let defer_tick_ident =
2977 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2978
2979 match builders_or_callback {
2980 BuildersOrCallback::Builders(graph_builders) => {
2981 let builder = graph_builders.get_dfir_mut(&out_location);
2982 builder.add_dfir(
2983 parse_quote! {
2984 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2985 },
2986 None,
2987 Some(&next_stmt_id.to_string()),
2988 );
2989 }
2990 BuildersOrCallback::Callback(_, node_callback) => {
2991 node_callback(node, next_stmt_id);
2992 }
2993 }
2994
2995 *next_stmt_id += 1;
2996
2997 ident_stack.push(defer_tick_ident);
2998 }
2999
3000 HydroNode::Enumerate { input, .. } => {
3001 let input_ident = ident_stack.pop().unwrap();
3002
3003 let enumerate_ident =
3004 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3005
3006 match builders_or_callback {
3007 BuildersOrCallback::Builders(graph_builders) => {
3008 let builder = graph_builders.get_dfir_mut(&out_location);
3009 let lifetime = if input.metadata().location_id.is_top_level() {
3010 quote!('static)
3011 } else {
3012 quote!('tick)
3013 };
3014 builder.add_dfir(
3015 parse_quote! {
3016 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3017 },
3018 None,
3019 Some(&next_stmt_id.to_string()),
3020 );
3021 }
3022 BuildersOrCallback::Callback(_, node_callback) => {
3023 node_callback(node, next_stmt_id);
3024 }
3025 }
3026
3027 *next_stmt_id += 1;
3028
3029 ident_stack.push(enumerate_ident);
3030 }
3031
3032 HydroNode::Inspect { f, .. } => {
3033 let input_ident = ident_stack.pop().unwrap();
3034
3035 let inspect_ident =
3036 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3037
3038 match builders_or_callback {
3039 BuildersOrCallback::Builders(graph_builders) => {
3040 let builder = graph_builders.get_dfir_mut(&out_location);
3041 builder.add_dfir(
3042 parse_quote! {
3043 #inspect_ident = #input_ident -> inspect(#f);
3044 },
3045 None,
3046 Some(&next_stmt_id.to_string()),
3047 );
3048 }
3049 BuildersOrCallback::Callback(_, node_callback) => {
3050 node_callback(node, next_stmt_id);
3051 }
3052 }
3053
3054 *next_stmt_id += 1;
3055
3056 ident_stack.push(inspect_ident);
3057 }
3058
3059 HydroNode::Unique { input, .. } => {
3060 let input_ident = ident_stack.pop().unwrap();
3061
3062 let unique_ident =
3063 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3064
3065 match builders_or_callback {
3066 BuildersOrCallback::Builders(graph_builders) => {
3067 let builder = graph_builders.get_dfir_mut(&out_location);
3068 let lifetime = if input.metadata().location_id.is_top_level() {
3069 quote!('static)
3070 } else {
3071 quote!('tick)
3072 };
3073
3074 builder.add_dfir(
3075 parse_quote! {
3076 #unique_ident = #input_ident -> unique::<#lifetime>();
3077 },
3078 None,
3079 Some(&next_stmt_id.to_string()),
3080 );
3081 }
3082 BuildersOrCallback::Callback(_, node_callback) => {
3083 node_callback(node, next_stmt_id);
3084 }
3085 }
3086
3087 *next_stmt_id += 1;
3088
3089 ident_stack.push(unique_ident);
3090 }
3091
3092 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3093 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3094 if input.metadata().location_id.is_top_level()
3095 && input.metadata().collection_kind.is_bounded()
3096 {
3097 parse_quote!(fold_no_replay)
3098 } else {
3099 parse_quote!(fold)
3100 }
3101 } else if matches!(node, HydroNode::Scan { .. }) {
3102 parse_quote!(scan)
3103 } else if let HydroNode::FoldKeyed { input, .. } = node {
3104 if input.metadata().location_id.is_top_level()
3105 && input.metadata().collection_kind.is_bounded()
3106 {
3107 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3108 } else {
3109 parse_quote!(fold_keyed)
3110 }
3111 } else {
3112 unreachable!()
3113 };
3114
3115 let (HydroNode::Fold { input, .. }
3116 | HydroNode::FoldKeyed { input, .. }
3117 | HydroNode::Scan { input, .. }) = node
3118 else {
3119 unreachable!()
3120 };
3121
3122 let lifetime = if input.metadata().location_id.is_top_level() {
3123 quote!('static)
3124 } else {
3125 quote!('tick)
3126 };
3127
3128 let input_ident = ident_stack.pop().unwrap();
3129
3130 let (HydroNode::Fold { init, acc, .. }
3131 | HydroNode::FoldKeyed { init, acc, .. }
3132 | HydroNode::Scan { init, acc, .. }) = &*node
3133 else {
3134 unreachable!()
3135 };
3136
3137 let fold_ident =
3138 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3139
3140 match builders_or_callback {
3141 BuildersOrCallback::Builders(graph_builders) => {
3142 if matches!(node, HydroNode::Fold { .. })
3143 && node.metadata().location_id.is_top_level()
3144 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3145 && graph_builders.singleton_intermediates()
3146 && !node.metadata().collection_kind.is_bounded()
3147 {
3148 let builder = graph_builders.get_dfir_mut(&out_location);
3149
3150 let acc: syn::Expr = parse_quote!({
3151 let mut __inner = #acc;
3152 move |__state, __value| {
3153 __inner(__state, __value);
3154 Some(__state.clone())
3155 }
3156 });
3157
3158 builder.add_dfir(
3159 parse_quote! {
3160 source_iter([(#init)()]) -> [0]#fold_ident;
3161 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3162 #fold_ident = chain();
3163 },
3164 None,
3165 Some(&next_stmt_id.to_string()),
3166 );
3167 } else if matches!(node, HydroNode::FoldKeyed { .. })
3168 && node.metadata().location_id.is_top_level()
3169 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3170 && graph_builders.singleton_intermediates()
3171 && !node.metadata().collection_kind.is_bounded()
3172 {
3173 let builder = graph_builders.get_dfir_mut(&out_location);
3174
3175 let acc: syn::Expr = parse_quote!({
3176 let mut __init = #init;
3177 let mut __inner = #acc;
3178 move |__state, __kv: (_, _)| {
3179 let __state = __state
3181 .entry(::std::clone::Clone::clone(&__kv.0))
3182 .or_insert_with(|| (__init)());
3183 __inner(__state, __kv.1);
3184 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3185 }
3186 });
3187
3188 builder.add_dfir(
3189 parse_quote! {
3190 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3191 },
3192 None,
3193 Some(&next_stmt_id.to_string()),
3194 );
3195 } else {
3196 let builder = graph_builders.get_dfir_mut(&out_location);
3197 builder.add_dfir(
3198 parse_quote! {
3199 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3200 },
3201 None,
3202 Some(&next_stmt_id.to_string()),
3203 );
3204 }
3205 }
3206 BuildersOrCallback::Callback(_, node_callback) => {
3207 node_callback(node, next_stmt_id);
3208 }
3209 }
3210
3211 *next_stmt_id += 1;
3212
3213 ident_stack.push(fold_ident);
3214 }
3215
3216 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3217 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3218 if input.metadata().location_id.is_top_level()
3219 && input.metadata().collection_kind.is_bounded()
3220 {
3221 parse_quote!(reduce_no_replay)
3222 } else {
3223 parse_quote!(reduce)
3224 }
3225 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3226 if input.metadata().location_id.is_top_level()
3227 && input.metadata().collection_kind.is_bounded()
3228 {
3229 todo!(
3230 "Calling keyed reduce on a top-level bounded collection is not supported"
3231 )
3232 } else {
3233 parse_quote!(reduce_keyed)
3234 }
3235 } else {
3236 unreachable!()
3237 };
3238
3239 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3240 else {
3241 unreachable!()
3242 };
3243
3244 let lifetime = if input.metadata().location_id.is_top_level() {
3245 quote!('static)
3246 } else {
3247 quote!('tick)
3248 };
3249
3250 let input_ident = ident_stack.pop().unwrap();
3251
3252 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3253 else {
3254 unreachable!()
3255 };
3256
3257 let reduce_ident =
3258 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3259
3260 match builders_or_callback {
3261 BuildersOrCallback::Builders(graph_builders) => {
3262 if matches!(node, HydroNode::Reduce { .. })
3263 && node.metadata().location_id.is_top_level()
3264 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3265 && graph_builders.singleton_intermediates()
3266 && !node.metadata().collection_kind.is_bounded()
3267 {
3268 todo!(
3269 "Reduce with optional intermediates is not yet supported in simulator"
3270 );
3271 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3272 && node.metadata().location_id.is_top_level()
3273 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3274 && graph_builders.singleton_intermediates()
3275 && !node.metadata().collection_kind.is_bounded()
3276 {
3277 todo!(
3278 "Reduce keyed with optional intermediates is not yet supported in simulator"
3279 );
3280 } else {
3281 let builder = graph_builders.get_dfir_mut(&out_location);
3282 builder.add_dfir(
3283 parse_quote! {
3284 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3285 },
3286 None,
3287 Some(&next_stmt_id.to_string()),
3288 );
3289 }
3290 }
3291 BuildersOrCallback::Callback(_, node_callback) => {
3292 node_callback(node, next_stmt_id);
3293 }
3294 }
3295
3296 *next_stmt_id += 1;
3297
3298 ident_stack.push(reduce_ident);
3299 }
3300
3301 HydroNode::ReduceKeyedWatermark {
3302 f,
3303 input,
3304 metadata,
3305 ..
3306 } => {
3307 let lifetime = if input.metadata().location_id.is_top_level() {
3308 quote!('static)
3309 } else {
3310 quote!('tick)
3311 };
3312
3313 let watermark_ident = ident_stack.pop().unwrap();
3315 let input_ident = ident_stack.pop().unwrap();
3316
3317 let chain_ident = syn::Ident::new(
3318 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3319 Span::call_site(),
3320 );
3321
3322 let fold_ident =
3323 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3324
3325 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3326 && input.metadata().collection_kind.is_bounded()
3327 {
3328 parse_quote!(fold_no_replay)
3329 } else {
3330 parse_quote!(fold)
3331 };
3332
3333 match builders_or_callback {
3334 BuildersOrCallback::Builders(graph_builders) => {
3335 if metadata.location_id.is_top_level()
3336 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3337 && graph_builders.singleton_intermediates()
3338 && !metadata.collection_kind.is_bounded()
3339 {
3340 todo!(
3341 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3342 )
3343 } else {
3344 let builder = graph_builders.get_dfir_mut(&out_location);
3345 builder.add_dfir(
3346 parse_quote! {
3347 #chain_ident = chain();
3348 #input_ident
3349 -> map(|x| (Some(x), None))
3350 -> [0]#chain_ident;
3351 #watermark_ident
3352 -> map(|watermark| (None, Some(watermark)))
3353 -> [1]#chain_ident;
3354
3355 #fold_ident = #chain_ident
3356 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3357 let __reduce_keyed_fn = #f;
3358 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3359 if let Some((k, v)) = opt_payload {
3360 if let Some(curr_watermark) = *opt_curr_watermark {
3361 if k <= curr_watermark {
3362 return;
3363 }
3364 }
3365 match map.entry(k) {
3366 ::std::collections::hash_map::Entry::Vacant(e) => {
3367 e.insert(v);
3368 }
3369 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3370 __reduce_keyed_fn(e.get_mut(), v);
3371 }
3372 }
3373 } else {
3374 let watermark = opt_watermark.unwrap();
3375 if let Some(curr_watermark) = *opt_curr_watermark {
3376 if watermark <= curr_watermark {
3377 return;
3378 }
3379 }
3380 *opt_curr_watermark = opt_watermark;
3381 map.retain(|k, _| *k > watermark);
3382 }
3383 }
3384 })
3385 -> flat_map(|(map, _curr_watermark)| map);
3386 },
3387 None,
3388 Some(&next_stmt_id.to_string()),
3389 );
3390 }
3391 }
3392 BuildersOrCallback::Callback(_, node_callback) => {
3393 node_callback(node, next_stmt_id);
3394 }
3395 }
3396
3397 *next_stmt_id += 1;
3398
3399 ident_stack.push(fold_ident);
3400 }
3401
3402 HydroNode::Network {
3403 serialize_fn: serialize_pipeline,
3404 instantiate_fn,
3405 deserialize_fn: deserialize_pipeline,
3406 input,
3407 ..
3408 } => {
3409 let input_ident = ident_stack.pop().unwrap();
3410
3411 let receiver_stream_ident =
3412 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3413
3414 match builders_or_callback {
3415 BuildersOrCallback::Builders(graph_builders) => {
3416 let (sink_expr, source_expr) = match instantiate_fn {
3417 DebugInstantiate::Building => (
3418 syn::parse_quote!(DUMMY_SINK),
3419 syn::parse_quote!(DUMMY_SOURCE),
3420 ),
3421
3422 DebugInstantiate::Finalized(finalized) => {
3423 (finalized.sink.clone(), finalized.source.clone())
3424 }
3425 };
3426
3427 graph_builders.create_network(
3428 &input.metadata().location_id,
3429 &out_location,
3430 input_ident,
3431 &receiver_stream_ident,
3432 serialize_pipeline.as_ref(),
3433 sink_expr,
3434 source_expr,
3435 deserialize_pipeline.as_ref(),
3436 *next_stmt_id,
3437 );
3438 }
3439 BuildersOrCallback::Callback(_, node_callback) => {
3440 node_callback(node, next_stmt_id);
3441 }
3442 }
3443
3444 *next_stmt_id += 1;
3445
3446 ident_stack.push(receiver_stream_ident);
3447 }
3448
3449 HydroNode::ExternalInput {
3450 instantiate_fn,
3451 deserialize_fn: deserialize_pipeline,
3452 ..
3453 } => {
3454 let receiver_stream_ident =
3455 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3456
3457 match builders_or_callback {
3458 BuildersOrCallback::Builders(graph_builders) => {
3459 let (_, source_expr) = match instantiate_fn {
3460 DebugInstantiate::Building => (
3461 syn::parse_quote!(DUMMY_SINK),
3462 syn::parse_quote!(DUMMY_SOURCE),
3463 ),
3464
3465 DebugInstantiate::Finalized(finalized) => {
3466 (finalized.sink.clone(), finalized.source.clone())
3467 }
3468 };
3469
3470 graph_builders.create_external_source(
3471 &out_location,
3472 source_expr,
3473 &receiver_stream_ident,
3474 deserialize_pipeline.as_ref(),
3475 *next_stmt_id,
3476 );
3477 }
3478 BuildersOrCallback::Callback(_, node_callback) => {
3479 node_callback(node, next_stmt_id);
3480 }
3481 }
3482
3483 *next_stmt_id += 1;
3484
3485 ident_stack.push(receiver_stream_ident);
3486 }
3487
3488 HydroNode::Counter {
3489 tag,
3490 duration,
3491 prefix,
3492 ..
3493 } => {
3494 let input_ident = ident_stack.pop().unwrap();
3495
3496 let counter_ident =
3497 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3498
3499 match builders_or_callback {
3500 BuildersOrCallback::Builders(graph_builders) => {
3501 let arg = format!("{}({})", prefix, tag);
3502 let builder = graph_builders.get_dfir_mut(&out_location);
3503 builder.add_dfir(
3504 parse_quote! {
3505 #counter_ident = #input_ident -> _counter(#arg, #duration);
3506 },
3507 None,
3508 Some(&next_stmt_id.to_string()),
3509 );
3510 }
3511 BuildersOrCallback::Callback(_, node_callback) => {
3512 node_callback(node, next_stmt_id);
3513 }
3514 }
3515
3516 *next_stmt_id += 1;
3517
3518 ident_stack.push(counter_ident);
3519 }
3520 }
3521 },
3522 seen_tees,
3523 false,
3524 );
3525
3526 ident_stack
3527 .pop()
3528 .expect("ident_stack should have exactly one element after traversal")
3529 }
3530
3531 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3532 match self {
3533 HydroNode::Placeholder => {
3534 panic!()
3535 }
3536 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3537 HydroNode::Source { source, .. } => match source {
3538 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3539 HydroSource::ExternalNetwork()
3540 | HydroSource::Spin()
3541 | HydroSource::ClusterMembers(_)
3542 | HydroSource::Embedded(_) => {} },
3544 HydroNode::SingletonSource { value, .. } => {
3545 transform(value);
3546 }
3547 HydroNode::CycleSource { .. }
3548 | HydroNode::Tee { .. }
3549 | HydroNode::YieldConcat { .. }
3550 | HydroNode::BeginAtomic { .. }
3551 | HydroNode::EndAtomic { .. }
3552 | HydroNode::Batch { .. }
3553 | HydroNode::Chain { .. }
3554 | HydroNode::ChainFirst { .. }
3555 | HydroNode::CrossProduct { .. }
3556 | HydroNode::CrossSingleton { .. }
3557 | HydroNode::ResolveFutures { .. }
3558 | HydroNode::ResolveFuturesOrdered { .. }
3559 | HydroNode::Join { .. }
3560 | HydroNode::Difference { .. }
3561 | HydroNode::AntiJoin { .. }
3562 | HydroNode::DeferTick { .. }
3563 | HydroNode::Enumerate { .. }
3564 | HydroNode::Unique { .. }
3565 | HydroNode::Sort { .. } => {}
3566 HydroNode::Map { f, .. }
3567 | HydroNode::FlatMap { f, .. }
3568 | HydroNode::Filter { f, .. }
3569 | HydroNode::FilterMap { f, .. }
3570 | HydroNode::Inspect { f, .. }
3571 | HydroNode::Reduce { f, .. }
3572 | HydroNode::ReduceKeyed { f, .. }
3573 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3574 transform(f);
3575 }
3576 HydroNode::Fold { init, acc, .. }
3577 | HydroNode::Scan { init, acc, .. }
3578 | HydroNode::FoldKeyed { init, acc, .. } => {
3579 transform(init);
3580 transform(acc);
3581 }
3582 HydroNode::Network {
3583 serialize_fn,
3584 deserialize_fn,
3585 ..
3586 } => {
3587 if let Some(serialize_fn) = serialize_fn {
3588 transform(serialize_fn);
3589 }
3590 if let Some(deserialize_fn) = deserialize_fn {
3591 transform(deserialize_fn);
3592 }
3593 }
3594 HydroNode::ExternalInput { deserialize_fn, .. } => {
3595 if let Some(deserialize_fn) = deserialize_fn {
3596 transform(deserialize_fn);
3597 }
3598 }
3599 HydroNode::Counter { duration, .. } => {
3600 transform(duration);
3601 }
3602 }
3603 }
3604
3605 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3606 &self.metadata().op
3607 }
3608
3609 pub fn metadata(&self) -> &HydroIrMetadata {
3610 match self {
3611 HydroNode::Placeholder => {
3612 panic!()
3613 }
3614 HydroNode::Cast { metadata, .. } => metadata,
3615 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3616 HydroNode::Source { metadata, .. } => metadata,
3617 HydroNode::SingletonSource { metadata, .. } => metadata,
3618 HydroNode::CycleSource { metadata, .. } => metadata,
3619 HydroNode::Tee { metadata, .. } => metadata,
3620 HydroNode::YieldConcat { metadata, .. } => metadata,
3621 HydroNode::BeginAtomic { metadata, .. } => metadata,
3622 HydroNode::EndAtomic { metadata, .. } => metadata,
3623 HydroNode::Batch { metadata, .. } => metadata,
3624 HydroNode::Chain { metadata, .. } => metadata,
3625 HydroNode::ChainFirst { metadata, .. } => metadata,
3626 HydroNode::CrossProduct { metadata, .. } => metadata,
3627 HydroNode::CrossSingleton { metadata, .. } => metadata,
3628 HydroNode::Join { metadata, .. } => metadata,
3629 HydroNode::Difference { metadata, .. } => metadata,
3630 HydroNode::AntiJoin { metadata, .. } => metadata,
3631 HydroNode::ResolveFutures { metadata, .. } => metadata,
3632 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3633 HydroNode::Map { metadata, .. } => metadata,
3634 HydroNode::FlatMap { metadata, .. } => metadata,
3635 HydroNode::Filter { metadata, .. } => metadata,
3636 HydroNode::FilterMap { metadata, .. } => metadata,
3637 HydroNode::DeferTick { metadata, .. } => metadata,
3638 HydroNode::Enumerate { metadata, .. } => metadata,
3639 HydroNode::Inspect { metadata, .. } => metadata,
3640 HydroNode::Unique { metadata, .. } => metadata,
3641 HydroNode::Sort { metadata, .. } => metadata,
3642 HydroNode::Scan { metadata, .. } => metadata,
3643 HydroNode::Fold { metadata, .. } => metadata,
3644 HydroNode::FoldKeyed { metadata, .. } => metadata,
3645 HydroNode::Reduce { metadata, .. } => metadata,
3646 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3647 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3648 HydroNode::ExternalInput { metadata, .. } => metadata,
3649 HydroNode::Network { metadata, .. } => metadata,
3650 HydroNode::Counter { metadata, .. } => metadata,
3651 }
3652 }
3653
3654 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3655 &mut self.metadata_mut().op
3656 }
3657
3658 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3659 match self {
3660 HydroNode::Placeholder => {
3661 panic!()
3662 }
3663 HydroNode::Cast { metadata, .. } => metadata,
3664 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3665 HydroNode::Source { metadata, .. } => metadata,
3666 HydroNode::SingletonSource { metadata, .. } => metadata,
3667 HydroNode::CycleSource { metadata, .. } => metadata,
3668 HydroNode::Tee { metadata, .. } => metadata,
3669 HydroNode::YieldConcat { metadata, .. } => metadata,
3670 HydroNode::BeginAtomic { metadata, .. } => metadata,
3671 HydroNode::EndAtomic { metadata, .. } => metadata,
3672 HydroNode::Batch { metadata, .. } => metadata,
3673 HydroNode::Chain { metadata, .. } => metadata,
3674 HydroNode::ChainFirst { metadata, .. } => metadata,
3675 HydroNode::CrossProduct { metadata, .. } => metadata,
3676 HydroNode::CrossSingleton { metadata, .. } => metadata,
3677 HydroNode::Join { metadata, .. } => metadata,
3678 HydroNode::Difference { metadata, .. } => metadata,
3679 HydroNode::AntiJoin { metadata, .. } => metadata,
3680 HydroNode::ResolveFutures { metadata, .. } => metadata,
3681 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3682 HydroNode::Map { metadata, .. } => metadata,
3683 HydroNode::FlatMap { metadata, .. } => metadata,
3684 HydroNode::Filter { metadata, .. } => metadata,
3685 HydroNode::FilterMap { metadata, .. } => metadata,
3686 HydroNode::DeferTick { metadata, .. } => metadata,
3687 HydroNode::Enumerate { metadata, .. } => metadata,
3688 HydroNode::Inspect { metadata, .. } => metadata,
3689 HydroNode::Unique { metadata, .. } => metadata,
3690 HydroNode::Sort { metadata, .. } => metadata,
3691 HydroNode::Scan { metadata, .. } => metadata,
3692 HydroNode::Fold { metadata, .. } => metadata,
3693 HydroNode::FoldKeyed { metadata, .. } => metadata,
3694 HydroNode::Reduce { metadata, .. } => metadata,
3695 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3696 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3697 HydroNode::ExternalInput { metadata, .. } => metadata,
3698 HydroNode::Network { metadata, .. } => metadata,
3699 HydroNode::Counter { metadata, .. } => metadata,
3700 }
3701 }
3702
3703 pub fn input(&self) -> Vec<&HydroNode> {
3704 match self {
3705 HydroNode::Placeholder => {
3706 panic!()
3707 }
3708 HydroNode::Source { .. }
3709 | HydroNode::SingletonSource { .. }
3710 | HydroNode::ExternalInput { .. }
3711 | HydroNode::CycleSource { .. }
3712 | HydroNode::Tee { .. } => {
3713 vec![]
3715 }
3716 HydroNode::Cast { inner, .. }
3717 | HydroNode::ObserveNonDet { inner, .. }
3718 | HydroNode::YieldConcat { inner, .. }
3719 | HydroNode::BeginAtomic { inner, .. }
3720 | HydroNode::EndAtomic { inner, .. }
3721 | HydroNode::Batch { inner, .. } => {
3722 vec![inner]
3723 }
3724 HydroNode::Chain { first, second, .. } => {
3725 vec![first, second]
3726 }
3727 HydroNode::ChainFirst { first, second, .. } => {
3728 vec![first, second]
3729 }
3730 HydroNode::CrossProduct { left, right, .. }
3731 | HydroNode::CrossSingleton { left, right, .. }
3732 | HydroNode::Join { left, right, .. } => {
3733 vec![left, right]
3734 }
3735 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3736 vec![pos, neg]
3737 }
3738 HydroNode::Map { input, .. }
3739 | HydroNode::FlatMap { input, .. }
3740 | HydroNode::Filter { input, .. }
3741 | HydroNode::FilterMap { input, .. }
3742 | HydroNode::Sort { input, .. }
3743 | HydroNode::DeferTick { input, .. }
3744 | HydroNode::Enumerate { input, .. }
3745 | HydroNode::Inspect { input, .. }
3746 | HydroNode::Unique { input, .. }
3747 | HydroNode::Network { input, .. }
3748 | HydroNode::Counter { input, .. }
3749 | HydroNode::ResolveFutures { input, .. }
3750 | HydroNode::ResolveFuturesOrdered { input, .. }
3751 | HydroNode::Fold { input, .. }
3752 | HydroNode::FoldKeyed { input, .. }
3753 | HydroNode::Reduce { input, .. }
3754 | HydroNode::ReduceKeyed { input, .. }
3755 | HydroNode::Scan { input, .. } => {
3756 vec![input]
3757 }
3758 HydroNode::ReduceKeyedWatermark {
3759 input, watermark, ..
3760 } => {
3761 vec![input, watermark]
3762 }
3763 }
3764 }
3765
3766 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3767 self.input()
3768 .iter()
3769 .map(|input_node| input_node.metadata())
3770 .collect()
3771 }
3772
3773 pub fn print_root(&self) -> String {
3774 match self {
3775 HydroNode::Placeholder => {
3776 panic!()
3777 }
3778 HydroNode::Cast { .. } => "Cast()".to_owned(),
3779 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3780 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3781 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3782 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3783 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3784 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3785 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3786 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3787 HydroNode::Batch { .. } => "Batch()".to_owned(),
3788 HydroNode::Chain { first, second, .. } => {
3789 format!("Chain({}, {})", first.print_root(), second.print_root())
3790 }
3791 HydroNode::ChainFirst { first, second, .. } => {
3792 format!(
3793 "ChainFirst({}, {})",
3794 first.print_root(),
3795 second.print_root()
3796 )
3797 }
3798 HydroNode::CrossProduct { left, right, .. } => {
3799 format!(
3800 "CrossProduct({}, {})",
3801 left.print_root(),
3802 right.print_root()
3803 )
3804 }
3805 HydroNode::CrossSingleton { left, right, .. } => {
3806 format!(
3807 "CrossSingleton({}, {})",
3808 left.print_root(),
3809 right.print_root()
3810 )
3811 }
3812 HydroNode::Join { left, right, .. } => {
3813 format!("Join({}, {})", left.print_root(), right.print_root())
3814 }
3815 HydroNode::Difference { pos, neg, .. } => {
3816 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3817 }
3818 HydroNode::AntiJoin { pos, neg, .. } => {
3819 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3820 }
3821 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3822 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3823 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3824 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3825 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3826 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3827 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3828 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3829 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3830 HydroNode::Unique { .. } => "Unique()".to_owned(),
3831 HydroNode::Sort { .. } => "Sort()".to_owned(),
3832 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3833 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3834 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3835 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3836 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3837 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3838 HydroNode::Network { .. } => "Network()".to_owned(),
3839 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3840 HydroNode::Counter { tag, duration, .. } => {
3841 format!("Counter({:?}, {:?})", tag, duration)
3842 }
3843 }
3844 }
3845}
3846
3847#[cfg(feature = "build")]
3848fn instantiate_network<'a, D>(
3849 from_location: &LocationId,
3850 to_location: &LocationId,
3851 processes: &SparseSecondaryMap<LocationKey, D::Process>,
3852 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3853) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3854where
3855 D: Deploy<'a>,
3856{
3857 let ((sink, source), connect_fn) = match (from_location, to_location) {
3858 (&LocationId::Process(from), &LocationId::Process(to)) => {
3859 let from_node = processes
3860 .get(from)
3861 .unwrap_or_else(|| {
3862 panic!("A process used in the graph was not instantiated: {}", from)
3863 })
3864 .clone();
3865 let to_node = processes
3866 .get(to)
3867 .unwrap_or_else(|| {
3868 panic!("A process used in the graph was not instantiated: {}", to)
3869 })
3870 .clone();
3871
3872 let sink_port = from_node.next_port();
3873 let source_port = to_node.next_port();
3874
3875 (
3876 D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3877 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3878 )
3879 }
3880 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3881 let from_node = processes
3882 .get(from)
3883 .unwrap_or_else(|| {
3884 panic!("A process used in the graph was not instantiated: {}", from)
3885 })
3886 .clone();
3887 let to_node = clusters
3888 .get(to)
3889 .unwrap_or_else(|| {
3890 panic!("A cluster used in the graph was not instantiated: {}", to)
3891 })
3892 .clone();
3893
3894 let sink_port = from_node.next_port();
3895 let source_port = to_node.next_port();
3896
3897 (
3898 D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3899 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3900 )
3901 }
3902 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3903 let from_node = clusters
3904 .get(from)
3905 .unwrap_or_else(|| {
3906 panic!("A cluster used in the graph was not instantiated: {}", from)
3907 })
3908 .clone();
3909 let to_node = processes
3910 .get(to)
3911 .unwrap_or_else(|| {
3912 panic!("A process used in the graph was not instantiated: {}", to)
3913 })
3914 .clone();
3915
3916 let sink_port = from_node.next_port();
3917 let source_port = to_node.next_port();
3918
3919 (
3920 D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3921 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3922 )
3923 }
3924 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3925 let from_node = clusters
3926 .get(from)
3927 .unwrap_or_else(|| {
3928 panic!("A cluster used in the graph was not instantiated: {}", from)
3929 })
3930 .clone();
3931 let to_node = clusters
3932 .get(to)
3933 .unwrap_or_else(|| {
3934 panic!("A cluster used in the graph was not instantiated: {}", to)
3935 })
3936 .clone();
3937
3938 let sink_port = from_node.next_port();
3939 let source_port = to_node.next_port();
3940
3941 (
3942 D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3943 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3944 )
3945 }
3946 (LocationId::Tick(_, _), _) => panic!(),
3947 (_, LocationId::Tick(_, _)) => panic!(),
3948 (LocationId::Atomic(_), _) => panic!(),
3949 (_, LocationId::Atomic(_)) => panic!(),
3950 };
3951 (sink, source, connect_fn)
3952}
3953
3954#[cfg(test)]
3955mod test {
3956 use std::mem::size_of;
3957
3958 use stageleft::{QuotedWithContext, q};
3959
3960 use super::*;
3961
3962 #[test]
3963 #[cfg_attr(
3964 not(feature = "build"),
3965 ignore = "expects inclusion of feature-gated fields"
3966 )]
3967 fn hydro_node_size() {
3968 assert_eq!(size_of::<HydroNode>(), 240);
3969 }
3970
3971 #[test]
3972 #[cfg_attr(
3973 not(feature = "build"),
3974 ignore = "expects inclusion of feature-gated fields"
3975 )]
3976 fn hydro_root_size() {
3977 assert_eq!(size_of::<HydroRoot>(), 136);
3978 }
3979
3980 #[test]
3981 fn test_simplify_q_macro_basic() {
3982 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3984 let result = simplify_q_macro(simple_expr.clone());
3985 assert_eq!(result, simple_expr);
3986 }
3987
3988 #[test]
3989 fn test_simplify_q_macro_actual_stageleft_call() {
3990 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3992 let result = simplify_q_macro(stageleft_call);
3993 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3996 }
3997
3998 #[test]
3999 fn test_closure_no_pipe_at_start() {
4000 let stageleft_call = q!({
4002 let foo = 123;
4003 move |b: usize| b + foo
4004 })
4005 .splice_fn1_ctx(&());
4006 let result = simplify_q_macro(stageleft_call);
4007 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4008 }
4009}