1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23use syn::visit::{self, Visit};
24use syn::visit_mut::VisitMut;
25
26use crate::compile::builder::{CycleId, ExternalPortId};
27#[cfg(feature = "build")]
28use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
29use crate::location::dynamic::LocationId;
30use crate::location::{LocationKey, NetworkHint};
31
32pub mod backtrace;
33use backtrace::Backtrace;
34
35#[derive(Clone, Hash)]
39pub struct DebugExpr(pub Box<syn::Expr>);
40
41impl From<syn::Expr> for DebugExpr {
42 fn from(expr: syn::Expr) -> Self {
43 Self(Box::new(expr))
44 }
45}
46
47impl Deref for DebugExpr {
48 type Target = syn::Expr;
49
50 fn deref(&self) -> &Self::Target {
51 &self.0
52 }
53}
54
55impl ToTokens for DebugExpr {
56 fn to_tokens(&self, tokens: &mut TokenStream) {
57 self.0.to_tokens(tokens);
58 }
59}
60
61impl Debug for DebugExpr {
62 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
63 write!(f, "{}", self.0.to_token_stream())
64 }
65}
66
67impl Display for DebugExpr {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 let original = self.0.as_ref().clone();
70 let simplified = simplify_q_macro(original);
71
72 write!(f, "q!({})", quote::quote!(#simplified))
75 }
76}
77
78fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
80 let mut simplifier = QMacroSimplifier::new();
83 simplifier.visit_expr_mut(&mut expr);
84
85 if let Some(simplified) = simplifier.simplified_result {
87 simplified
88 } else {
89 expr
90 }
91}
92
93#[derive(Default)]
95pub struct QMacroSimplifier {
96 pub simplified_result: Option<syn::Expr>,
97}
98
99impl QMacroSimplifier {
100 pub fn new() -> Self {
101 Self::default()
102 }
103}
104
105impl VisitMut for QMacroSimplifier {
106 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
107 if self.simplified_result.is_some() {
109 return;
110 }
111
112 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
113 && self.is_stageleft_runtime_support_call(&path_expr.path)
115 && let Some(closure) = self.extract_closure_from_args(&call.args)
117 {
118 self.simplified_result = Some(closure);
119 return;
120 }
121
122 syn::visit_mut::visit_expr_mut(self, expr);
125 }
126}
127
128impl QMacroSimplifier {
129 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
130 if let Some(last_segment) = path.segments.last() {
132 let fn_name = last_segment.ident.to_string();
133 fn_name.contains("_type_hint")
135 && path.segments.len() > 2
136 && path.segments[0].ident == "stageleft"
137 && path.segments[1].ident == "runtime_support"
138 } else {
139 false
140 }
141 }
142
143 fn extract_closure_from_args(
144 &self,
145 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
146 ) -> Option<syn::Expr> {
147 for arg in args {
149 if let syn::Expr::Closure(_) = arg {
150 return Some(arg.clone());
151 }
152 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
154 return Some(closure_expr);
155 }
156 }
157 None
158 }
159
160 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
161 let mut visitor = ClosureFinder {
162 found_closure: None,
163 prefer_inner_blocks: true,
164 };
165 visitor.visit_expr(expr);
166 visitor.found_closure
167 }
168}
169
170struct ClosureFinder {
172 found_closure: Option<syn::Expr>,
173 prefer_inner_blocks: bool,
174}
175
176impl<'ast> Visit<'ast> for ClosureFinder {
177 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
178 if self.found_closure.is_some() {
180 return;
181 }
182
183 match expr {
184 syn::Expr::Closure(_) => {
185 self.found_closure = Some(expr.clone());
186 }
187 syn::Expr::Block(block) if self.prefer_inner_blocks => {
188 for stmt in &block.block.stmts {
190 if let syn::Stmt::Expr(stmt_expr, _) = stmt
191 && let syn::Expr::Block(_) = stmt_expr
192 {
193 let mut inner_visitor = ClosureFinder {
195 found_closure: None,
196 prefer_inner_blocks: false, };
198 inner_visitor.visit_expr(stmt_expr);
199 if inner_visitor.found_closure.is_some() {
200 self.found_closure = Some(stmt_expr.clone());
202 return;
203 }
204 }
205 }
206
207 visit::visit_expr(self, expr);
209
210 if self.found_closure.is_some() {
213 }
215 }
216 _ => {
217 visit::visit_expr(self, expr);
219 }
220 }
221 }
222}
223
224#[derive(Clone, PartialEq, Eq, Hash)]
228pub struct DebugType(pub Box<syn::Type>);
229
230impl From<syn::Type> for DebugType {
231 fn from(t: syn::Type) -> Self {
232 Self(Box::new(t))
233 }
234}
235
236impl Deref for DebugType {
237 type Target = syn::Type;
238
239 fn deref(&self) -> &Self::Target {
240 &self.0
241 }
242}
243
244impl ToTokens for DebugType {
245 fn to_tokens(&self, tokens: &mut TokenStream) {
246 self.0.to_tokens(tokens);
247 }
248}
249
250impl Debug for DebugType {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 write!(f, "{}", self.0.to_token_stream())
253 }
254}
255
256pub enum DebugInstantiate {
257 Building,
258 Finalized(Box<DebugInstantiateFinalized>),
259}
260
261#[cfg_attr(
262 not(feature = "build"),
263 expect(
264 dead_code,
265 reason = "sink, source unused without `feature = \"build\"`."
266 )
267)]
268pub struct DebugInstantiateFinalized {
269 sink: syn::Expr,
270 source: syn::Expr,
271 connect_fn: Option<Box<dyn FnOnce()>>,
272}
273
274impl From<DebugInstantiateFinalized> for DebugInstantiate {
275 fn from(f: DebugInstantiateFinalized) -> Self {
276 Self::Finalized(Box::new(f))
277 }
278}
279
280impl Debug for DebugInstantiate {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 write!(f, "<network instantiate>")
283 }
284}
285
286impl Hash for DebugInstantiate {
287 fn hash<H: Hasher>(&self, _state: &mut H) {
288 }
290}
291
292impl Clone for DebugInstantiate {
293 fn clone(&self) -> Self {
294 match self {
295 DebugInstantiate::Building => DebugInstantiate::Building,
296 DebugInstantiate::Finalized(_) => {
297 panic!("DebugInstantiate::Finalized should not be cloned")
298 }
299 }
300 }
301}
302
303#[derive(Debug, Hash, Clone)]
312pub enum ClusterMembersState {
313 Uninit,
315 Stream(DebugExpr),
318 Tee(LocationId, LocationId),
322}
323
324#[derive(Debug, Hash, Clone)]
326pub enum HydroSource {
327 Stream(DebugExpr),
328 ExternalNetwork(),
329 Iter(DebugExpr),
330 Spin(),
331 ClusterMembers(LocationId, ClusterMembersState),
332 Embedded(syn::Ident),
333}
334
335#[cfg(feature = "build")]
336pub trait DfirBuilder {
342 fn singleton_intermediates(&self) -> bool;
344
345 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
347
348 fn batch(
349 &mut self,
350 in_ident: syn::Ident,
351 in_location: &LocationId,
352 in_kind: &CollectionKind,
353 out_ident: &syn::Ident,
354 out_location: &LocationId,
355 op_meta: &HydroIrOpMetadata,
356 );
357 fn yield_from_tick(
358 &mut self,
359 in_ident: syn::Ident,
360 in_location: &LocationId,
361 in_kind: &CollectionKind,
362 out_ident: &syn::Ident,
363 out_location: &LocationId,
364 );
365
366 fn begin_atomic(
367 &mut self,
368 in_ident: syn::Ident,
369 in_location: &LocationId,
370 in_kind: &CollectionKind,
371 out_ident: &syn::Ident,
372 out_location: &LocationId,
373 op_meta: &HydroIrOpMetadata,
374 );
375 fn end_atomic(
376 &mut self,
377 in_ident: syn::Ident,
378 in_location: &LocationId,
379 in_kind: &CollectionKind,
380 out_ident: &syn::Ident,
381 );
382
383 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
384 fn observe_nondet(
385 &mut self,
386 trusted: bool,
387 location: &LocationId,
388 in_ident: syn::Ident,
389 in_kind: &CollectionKind,
390 out_ident: &syn::Ident,
391 out_kind: &CollectionKind,
392 op_meta: &HydroIrOpMetadata,
393 );
394
395 #[expect(clippy::too_many_arguments, reason = "TODO")]
396 fn create_network(
397 &mut self,
398 from: &LocationId,
399 to: &LocationId,
400 input_ident: syn::Ident,
401 out_ident: &syn::Ident,
402 serialize: Option<&DebugExpr>,
403 sink: syn::Expr,
404 source: syn::Expr,
405 deserialize: Option<&DebugExpr>,
406 tag_id: usize,
407 networking_info: &crate::networking::NetworkingInfo,
408 );
409
410 fn create_external_source(
411 &mut self,
412 on: &LocationId,
413 source_expr: syn::Expr,
414 out_ident: &syn::Ident,
415 deserialize: Option<&DebugExpr>,
416 tag_id: usize,
417 );
418
419 fn create_external_output(
420 &mut self,
421 on: &LocationId,
422 sink_expr: syn::Expr,
423 input_ident: &syn::Ident,
424 serialize: Option<&DebugExpr>,
425 tag_id: usize,
426 );
427}
428
429#[cfg(feature = "build")]
430impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
431 fn singleton_intermediates(&self) -> bool {
432 false
433 }
434
435 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
436 self.entry(location.root().key())
437 .expect("location was removed")
438 .or_default()
439 }
440
441 fn batch(
442 &mut self,
443 in_ident: syn::Ident,
444 in_location: &LocationId,
445 in_kind: &CollectionKind,
446 out_ident: &syn::Ident,
447 _out_location: &LocationId,
448 _op_meta: &HydroIrOpMetadata,
449 ) {
450 let builder = self.get_dfir_mut(in_location.root());
451 if in_kind.is_bounded()
452 && matches!(
453 in_kind,
454 CollectionKind::Singleton { .. }
455 | CollectionKind::Optional { .. }
456 | CollectionKind::KeyedSingleton { .. }
457 )
458 {
459 assert!(in_location.is_top_level());
460 builder.add_dfir(
461 parse_quote! {
462 #out_ident = #in_ident -> persist::<'static>();
463 },
464 None,
465 None,
466 );
467 } else {
468 builder.add_dfir(
469 parse_quote! {
470 #out_ident = #in_ident;
471 },
472 None,
473 None,
474 );
475 }
476 }
477
478 fn yield_from_tick(
479 &mut self,
480 in_ident: syn::Ident,
481 in_location: &LocationId,
482 _in_kind: &CollectionKind,
483 out_ident: &syn::Ident,
484 _out_location: &LocationId,
485 ) {
486 let builder = self.get_dfir_mut(in_location.root());
487 builder.add_dfir(
488 parse_quote! {
489 #out_ident = #in_ident;
490 },
491 None,
492 None,
493 );
494 }
495
496 fn begin_atomic(
497 &mut self,
498 in_ident: syn::Ident,
499 in_location: &LocationId,
500 _in_kind: &CollectionKind,
501 out_ident: &syn::Ident,
502 _out_location: &LocationId,
503 _op_meta: &HydroIrOpMetadata,
504 ) {
505 let builder = self.get_dfir_mut(in_location.root());
506 builder.add_dfir(
507 parse_quote! {
508 #out_ident = #in_ident;
509 },
510 None,
511 None,
512 );
513 }
514
515 fn end_atomic(
516 &mut self,
517 in_ident: syn::Ident,
518 in_location: &LocationId,
519 _in_kind: &CollectionKind,
520 out_ident: &syn::Ident,
521 ) {
522 let builder = self.get_dfir_mut(in_location.root());
523 builder.add_dfir(
524 parse_quote! {
525 #out_ident = #in_ident;
526 },
527 None,
528 None,
529 );
530 }
531
532 fn observe_nondet(
533 &mut self,
534 _trusted: bool,
535 location: &LocationId,
536 in_ident: syn::Ident,
537 _in_kind: &CollectionKind,
538 out_ident: &syn::Ident,
539 _out_kind: &CollectionKind,
540 _op_meta: &HydroIrOpMetadata,
541 ) {
542 let builder = self.get_dfir_mut(location);
543 builder.add_dfir(
544 parse_quote! {
545 #out_ident = #in_ident;
546 },
547 None,
548 None,
549 );
550 }
551
552 fn create_network(
553 &mut self,
554 from: &LocationId,
555 to: &LocationId,
556 input_ident: syn::Ident,
557 out_ident: &syn::Ident,
558 serialize: Option<&DebugExpr>,
559 sink: syn::Expr,
560 source: syn::Expr,
561 deserialize: Option<&DebugExpr>,
562 tag_id: usize,
563 _networking_info: &crate::networking::NetworkingInfo,
564 ) {
565 let sender_builder = self.get_dfir_mut(from);
566 if let Some(serialize_pipeline) = serialize {
567 sender_builder.add_dfir(
568 parse_quote! {
569 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
570 },
571 None,
572 Some(&format!("send{}", tag_id)),
574 );
575 } else {
576 sender_builder.add_dfir(
577 parse_quote! {
578 #input_ident -> dest_sink(#sink);
579 },
580 None,
581 Some(&format!("send{}", tag_id)),
582 );
583 }
584
585 let receiver_builder = self.get_dfir_mut(to);
586 if let Some(deserialize_pipeline) = deserialize {
587 receiver_builder.add_dfir(
588 parse_quote! {
589 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
590 },
591 None,
592 Some(&format!("recv{}", tag_id)),
593 );
594 } else {
595 receiver_builder.add_dfir(
596 parse_quote! {
597 #out_ident = source_stream(#source);
598 },
599 None,
600 Some(&format!("recv{}", tag_id)),
601 );
602 }
603 }
604
605 fn create_external_source(
606 &mut self,
607 on: &LocationId,
608 source_expr: syn::Expr,
609 out_ident: &syn::Ident,
610 deserialize: Option<&DebugExpr>,
611 tag_id: usize,
612 ) {
613 let receiver_builder = self.get_dfir_mut(on);
614 if let Some(deserialize_pipeline) = deserialize {
615 receiver_builder.add_dfir(
616 parse_quote! {
617 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
618 },
619 None,
620 Some(&format!("recv{}", tag_id)),
621 );
622 } else {
623 receiver_builder.add_dfir(
624 parse_quote! {
625 #out_ident = source_stream(#source_expr);
626 },
627 None,
628 Some(&format!("recv{}", tag_id)),
629 );
630 }
631 }
632
633 fn create_external_output(
634 &mut self,
635 on: &LocationId,
636 sink_expr: syn::Expr,
637 input_ident: &syn::Ident,
638 serialize: Option<&DebugExpr>,
639 tag_id: usize,
640 ) {
641 let sender_builder = self.get_dfir_mut(on);
642 if let Some(serialize_fn) = serialize {
643 sender_builder.add_dfir(
644 parse_quote! {
645 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
646 },
647 None,
648 Some(&format!("send{}", tag_id)),
650 );
651 } else {
652 sender_builder.add_dfir(
653 parse_quote! {
654 #input_ident -> dest_sink(#sink_expr);
655 },
656 None,
657 Some(&format!("send{}", tag_id)),
658 );
659 }
660 }
661}
662
663#[cfg(feature = "build")]
664pub enum BuildersOrCallback<'a, L, N>
665where
666 L: FnMut(&mut HydroRoot, &mut usize),
667 N: FnMut(&mut HydroNode, &mut usize),
668{
669 Builders(&'a mut dyn DfirBuilder),
670 Callback(L, N),
671}
672
673#[derive(Debug, Hash)]
677pub enum HydroRoot {
678 ForEach {
679 f: DebugExpr,
680 input: Box<HydroNode>,
681 op_metadata: HydroIrOpMetadata,
682 },
683 SendExternal {
684 to_external_key: LocationKey,
685 to_port_id: ExternalPortId,
686 to_many: bool,
687 unpaired: bool,
688 serialize_fn: Option<DebugExpr>,
689 instantiate_fn: DebugInstantiate,
690 input: Box<HydroNode>,
691 op_metadata: HydroIrOpMetadata,
692 },
693 DestSink {
694 sink: DebugExpr,
695 input: Box<HydroNode>,
696 op_metadata: HydroIrOpMetadata,
697 },
698 CycleSink {
699 cycle_id: CycleId,
700 input: Box<HydroNode>,
701 op_metadata: HydroIrOpMetadata,
702 },
703 EmbeddedOutput {
704 ident: syn::Ident,
705 input: Box<HydroNode>,
706 op_metadata: HydroIrOpMetadata,
707 },
708}
709
710impl HydroRoot {
711 #[cfg(feature = "build")]
712 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
713 pub fn compile_network<'a, D>(
714 &mut self,
715 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
716 seen_tees: &mut SeenTees,
717 seen_cluster_members: &mut HashSet<(LocationId, LocationId)>,
718 processes: &SparseSecondaryMap<LocationKey, D::Process>,
719 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
720 externals: &SparseSecondaryMap<LocationKey, D::External>,
721 env: &mut D::InstantiateEnv,
722 ) where
723 D: Deploy<'a>,
724 {
725 let refcell_extra_stmts = RefCell::new(extra_stmts);
726 let refcell_env = RefCell::new(env);
727 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
728 self.transform_bottom_up(
729 &mut |l| {
730 if let HydroRoot::SendExternal {
731 input,
732 to_external_key,
733 to_port_id,
734 to_many,
735 unpaired,
736 instantiate_fn,
737 ..
738 } = l
739 {
740 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
741 DebugInstantiate::Building => {
742 let to_node = externals
743 .get(*to_external_key)
744 .unwrap_or_else(|| {
745 panic!("A external used in the graph was not instantiated: {}", to_external_key)
746 })
747 .clone();
748
749 match input.metadata().location_id.root() {
750 &LocationId::Process(process_key) => {
751 if *to_many {
752 (
753 (
754 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
755 parse_quote!(DUMMY),
756 ),
757 Box::new(|| {}) as Box<dyn FnOnce()>,
758 )
759 } else {
760 let from_node = processes
761 .get(process_key)
762 .unwrap_or_else(|| {
763 panic!("A process used in the graph was not instantiated: {}", process_key)
764 })
765 .clone();
766
767 let sink_port = from_node.next_port();
768 let source_port = to_node.next_port();
769
770 if *unpaired {
771 use stageleft::quote_type;
772 use tokio_util::codec::LengthDelimitedCodec;
773
774 to_node.register(*to_port_id, source_port.clone());
775
776 let _ = D::e2o_source(
777 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
778 &to_node, &source_port,
779 &from_node, &sink_port,
780 "e_type::<LengthDelimitedCodec>(),
781 format!("{}_{}", *to_external_key, *to_port_id)
782 );
783 }
784
785 (
786 (
787 D::o2e_sink(
788 &from_node,
789 &sink_port,
790 &to_node,
791 &source_port,
792 format!("{}_{}", *to_external_key, *to_port_id)
793 ),
794 parse_quote!(DUMMY),
795 ),
796 if *unpaired {
797 D::e2o_connect(
798 &to_node,
799 &source_port,
800 &from_node,
801 &sink_port,
802 *to_many,
803 NetworkHint::Auto,
804 )
805 } else {
806 Box::new(|| {}) as Box<dyn FnOnce()>
807 },
808 )
809 }
810 }
811 LocationId::Cluster(_) => todo!(),
812 _ => panic!()
813 }
814 },
815
816 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
817 };
818
819 *instantiate_fn = DebugInstantiateFinalized {
820 sink: sink_expr,
821 source: source_expr,
822 connect_fn: Some(connect_fn),
823 }
824 .into();
825 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
826 let element_type = match &input.metadata().collection_kind {
827 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
828 _ => panic!("Embedded output must have Stream collection kind"),
829 };
830 let location_key = match input.metadata().location_id.root() {
831 LocationId::Process(key) | LocationId::Cluster(key) => *key,
832 _ => panic!("Embedded output must be on a process or cluster"),
833 };
834 D::register_embedded_output(
835 &mut refcell_env.borrow_mut(),
836 location_key,
837 ident,
838 &element_type,
839 );
840 }
841 },
842 &mut |n| {
843 if let HydroNode::Network {
844 name,
845 networking_info,
846 input,
847 instantiate_fn,
848 metadata,
849 ..
850 } = n
851 {
852 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
853 DebugInstantiate::Building => instantiate_network::<D>(
854 &mut refcell_env.borrow_mut(),
855 input.metadata().location_id.root(),
856 metadata.location_id.root(),
857 processes,
858 clusters,
859 name.as_deref(),
860 networking_info,
861 ),
862
863 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
864 };
865
866 *instantiate_fn = DebugInstantiateFinalized {
867 sink: sink_expr,
868 source: source_expr,
869 connect_fn: Some(connect_fn),
870 }
871 .into();
872 } else if let HydroNode::ExternalInput {
873 from_external_key,
874 from_port_id,
875 from_many,
876 codec_type,
877 port_hint,
878 instantiate_fn,
879 metadata,
880 ..
881 } = n
882 {
883 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
884 DebugInstantiate::Building => {
885 let from_node = externals
886 .get(*from_external_key)
887 .unwrap_or_else(|| {
888 panic!(
889 "A external used in the graph was not instantiated: {}",
890 from_external_key,
891 )
892 })
893 .clone();
894
895 match metadata.location_id.root() {
896 &LocationId::Process(process_key) => {
897 let to_node = processes
898 .get(process_key)
899 .unwrap_or_else(|| {
900 panic!("A process used in the graph was not instantiated: {}", process_key)
901 })
902 .clone();
903
904 let sink_port = from_node.next_port();
905 let source_port = to_node.next_port();
906
907 from_node.register(*from_port_id, sink_port.clone());
908
909 (
910 (
911 parse_quote!(DUMMY),
912 if *from_many {
913 D::e2o_many_source(
914 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
915 &to_node, &source_port,
916 codec_type.0.as_ref(),
917 format!("{}_{}", *from_external_key, *from_port_id)
918 )
919 } else {
920 D::e2o_source(
921 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
922 &from_node, &sink_port,
923 &to_node, &source_port,
924 codec_type.0.as_ref(),
925 format!("{}_{}", *from_external_key, *from_port_id)
926 )
927 },
928 ),
929 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
930 )
931 }
932 LocationId::Cluster(_) => todo!(),
933 _ => panic!()
934 }
935 },
936
937 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
938 };
939
940 *instantiate_fn = DebugInstantiateFinalized {
941 sink: sink_expr,
942 source: source_expr,
943 connect_fn: Some(connect_fn),
944 }
945 .into();
946 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
947 let element_type = match &metadata.collection_kind {
948 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
949 _ => panic!("Embedded source must have Stream collection kind"),
950 };
951 let location_key = match metadata.location_id.root() {
952 LocationId::Process(key) | LocationId::Cluster(key) => *key,
953 _ => panic!("Embedded source must be on a process or cluster"),
954 };
955 D::register_embedded_input(
956 &mut refcell_env.borrow_mut(),
957 location_key,
958 ident,
959 &element_type,
960 );
961 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
962 match state {
963 ClusterMembersState::Uninit => {
964 let at_location = metadata.location_id.root().clone();
965 let key = (at_location.clone(), LocationId::Cluster(location_id.key()));
966 if refcell_seen_cluster_members.borrow_mut().insert(key) {
967 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
969 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
970 &(),
971 );
972 *state = ClusterMembersState::Stream(expr.into());
973 } else {
974 *state = ClusterMembersState::Tee(at_location, location_id.clone());
976 }
977 }
978 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
979 panic!("cluster members already finalized");
980 }
981 }
982 }
983 },
984 seen_tees,
985 false,
986 );
987 }
988
989 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
990 self.transform_bottom_up(
991 &mut |l| {
992 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
993 match instantiate_fn {
994 DebugInstantiate::Building => panic!("network not built"),
995
996 DebugInstantiate::Finalized(finalized) => {
997 (finalized.connect_fn.take().unwrap())();
998 }
999 }
1000 }
1001 },
1002 &mut |n| {
1003 if let HydroNode::Network { instantiate_fn, .. }
1004 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1005 {
1006 match instantiate_fn {
1007 DebugInstantiate::Building => panic!("network not built"),
1008
1009 DebugInstantiate::Finalized(finalized) => {
1010 (finalized.connect_fn.take().unwrap())();
1011 }
1012 }
1013 }
1014 },
1015 seen_tees,
1016 false,
1017 );
1018 }
1019
1020 pub fn transform_bottom_up(
1021 &mut self,
1022 transform_root: &mut impl FnMut(&mut HydroRoot),
1023 transform_node: &mut impl FnMut(&mut HydroNode),
1024 seen_tees: &mut SeenTees,
1025 check_well_formed: bool,
1026 ) {
1027 self.transform_children(
1028 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1029 seen_tees,
1030 );
1031
1032 transform_root(self);
1033 }
1034
1035 pub fn transform_children(
1036 &mut self,
1037 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1038 seen_tees: &mut SeenTees,
1039 ) {
1040 match self {
1041 HydroRoot::ForEach { input, .. }
1042 | HydroRoot::SendExternal { input, .. }
1043 | HydroRoot::DestSink { input, .. }
1044 | HydroRoot::CycleSink { input, .. }
1045 | HydroRoot::EmbeddedOutput { input, .. } => {
1046 transform(input, seen_tees);
1047 }
1048 }
1049 }
1050
1051 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
1052 match self {
1053 HydroRoot::ForEach {
1054 f,
1055 input,
1056 op_metadata,
1057 } => HydroRoot::ForEach {
1058 f: f.clone(),
1059 input: Box::new(input.deep_clone(seen_tees)),
1060 op_metadata: op_metadata.clone(),
1061 },
1062 HydroRoot::SendExternal {
1063 to_external_key,
1064 to_port_id,
1065 to_many,
1066 unpaired,
1067 serialize_fn,
1068 instantiate_fn,
1069 input,
1070 op_metadata,
1071 } => HydroRoot::SendExternal {
1072 to_external_key: *to_external_key,
1073 to_port_id: *to_port_id,
1074 to_many: *to_many,
1075 unpaired: *unpaired,
1076 serialize_fn: serialize_fn.clone(),
1077 instantiate_fn: instantiate_fn.clone(),
1078 input: Box::new(input.deep_clone(seen_tees)),
1079 op_metadata: op_metadata.clone(),
1080 },
1081 HydroRoot::DestSink {
1082 sink,
1083 input,
1084 op_metadata,
1085 } => HydroRoot::DestSink {
1086 sink: sink.clone(),
1087 input: Box::new(input.deep_clone(seen_tees)),
1088 op_metadata: op_metadata.clone(),
1089 },
1090 HydroRoot::CycleSink {
1091 cycle_id,
1092 input,
1093 op_metadata,
1094 } => HydroRoot::CycleSink {
1095 cycle_id: *cycle_id,
1096 input: Box::new(input.deep_clone(seen_tees)),
1097 op_metadata: op_metadata.clone(),
1098 },
1099 HydroRoot::EmbeddedOutput {
1100 ident,
1101 input,
1102 op_metadata,
1103 } => HydroRoot::EmbeddedOutput {
1104 ident: ident.clone(),
1105 input: Box::new(input.deep_clone(seen_tees)),
1106 op_metadata: op_metadata.clone(),
1107 },
1108 }
1109 }
1110
1111 #[cfg(feature = "build")]
1112 pub fn emit(
1113 &mut self,
1114 graph_builders: &mut dyn DfirBuilder,
1115 seen_tees: &mut SeenTees,
1116 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1117 next_stmt_id: &mut usize,
1118 ) {
1119 self.emit_core(
1120 &mut BuildersOrCallback::<
1121 fn(&mut HydroRoot, &mut usize),
1122 fn(&mut HydroNode, &mut usize),
1123 >::Builders(graph_builders),
1124 seen_tees,
1125 built_tees,
1126 next_stmt_id,
1127 );
1128 }
1129
1130 #[cfg(feature = "build")]
1131 pub fn emit_core(
1132 &mut self,
1133 builders_or_callback: &mut BuildersOrCallback<
1134 impl FnMut(&mut HydroRoot, &mut usize),
1135 impl FnMut(&mut HydroNode, &mut usize),
1136 >,
1137 seen_tees: &mut SeenTees,
1138 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1139 next_stmt_id: &mut usize,
1140 ) {
1141 match self {
1142 HydroRoot::ForEach { f, input, .. } => {
1143 let input_ident =
1144 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1145
1146 match builders_or_callback {
1147 BuildersOrCallback::Builders(graph_builders) => {
1148 graph_builders
1149 .get_dfir_mut(&input.metadata().location_id)
1150 .add_dfir(
1151 parse_quote! {
1152 #input_ident -> for_each(#f);
1153 },
1154 None,
1155 Some(&next_stmt_id.to_string()),
1156 );
1157 }
1158 BuildersOrCallback::Callback(leaf_callback, _) => {
1159 leaf_callback(self, next_stmt_id);
1160 }
1161 }
1162
1163 *next_stmt_id += 1;
1164 }
1165
1166 HydroRoot::SendExternal {
1167 serialize_fn,
1168 instantiate_fn,
1169 input,
1170 ..
1171 } => {
1172 let input_ident =
1173 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1174
1175 match builders_or_callback {
1176 BuildersOrCallback::Builders(graph_builders) => {
1177 let (sink_expr, _) = match instantiate_fn {
1178 DebugInstantiate::Building => (
1179 syn::parse_quote!(DUMMY_SINK),
1180 syn::parse_quote!(DUMMY_SOURCE),
1181 ),
1182
1183 DebugInstantiate::Finalized(finalized) => {
1184 (finalized.sink.clone(), finalized.source.clone())
1185 }
1186 };
1187
1188 graph_builders.create_external_output(
1189 &input.metadata().location_id,
1190 sink_expr,
1191 &input_ident,
1192 serialize_fn.as_ref(),
1193 *next_stmt_id,
1194 );
1195 }
1196 BuildersOrCallback::Callback(leaf_callback, _) => {
1197 leaf_callback(self, next_stmt_id);
1198 }
1199 }
1200
1201 *next_stmt_id += 1;
1202 }
1203
1204 HydroRoot::DestSink { sink, input, .. } => {
1205 let input_ident =
1206 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1207
1208 match builders_or_callback {
1209 BuildersOrCallback::Builders(graph_builders) => {
1210 graph_builders
1211 .get_dfir_mut(&input.metadata().location_id)
1212 .add_dfir(
1213 parse_quote! {
1214 #input_ident -> dest_sink(#sink);
1215 },
1216 None,
1217 Some(&next_stmt_id.to_string()),
1218 );
1219 }
1220 BuildersOrCallback::Callback(leaf_callback, _) => {
1221 leaf_callback(self, next_stmt_id);
1222 }
1223 }
1224
1225 *next_stmt_id += 1;
1226 }
1227
1228 HydroRoot::CycleSink {
1229 cycle_id, input, ..
1230 } => {
1231 let input_ident =
1232 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1233
1234 match builders_or_callback {
1235 BuildersOrCallback::Builders(graph_builders) => {
1236 let elem_type: syn::Type = match &input.metadata().collection_kind {
1237 CollectionKind::KeyedSingleton {
1238 key_type,
1239 value_type,
1240 ..
1241 }
1242 | CollectionKind::KeyedStream {
1243 key_type,
1244 value_type,
1245 ..
1246 } => {
1247 parse_quote!((#key_type, #value_type))
1248 }
1249 CollectionKind::Stream { element_type, .. }
1250 | CollectionKind::Singleton { element_type, .. }
1251 | CollectionKind::Optional { element_type, .. } => {
1252 parse_quote!(#element_type)
1253 }
1254 };
1255
1256 let cycle_id_ident = cycle_id.as_ident();
1257 graph_builders
1258 .get_dfir_mut(&input.metadata().location_id)
1259 .add_dfir(
1260 parse_quote! {
1261 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1262 },
1263 None,
1264 None,
1265 );
1266 }
1267 BuildersOrCallback::Callback(_, _) => {}
1269 }
1270 }
1271
1272 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1273 let input_ident =
1274 input.emit_core(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1275
1276 match builders_or_callback {
1277 BuildersOrCallback::Builders(graph_builders) => {
1278 graph_builders
1279 .get_dfir_mut(&input.metadata().location_id)
1280 .add_dfir(
1281 parse_quote! {
1282 #input_ident -> for_each(&mut #ident);
1283 },
1284 None,
1285 Some(&next_stmt_id.to_string()),
1286 );
1287 }
1288 BuildersOrCallback::Callback(leaf_callback, _) => {
1289 leaf_callback(self, next_stmt_id);
1290 }
1291 }
1292
1293 *next_stmt_id += 1;
1294 }
1295 }
1296 }
1297
1298 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1299 match self {
1300 HydroRoot::ForEach { op_metadata, .. }
1301 | HydroRoot::SendExternal { op_metadata, .. }
1302 | HydroRoot::DestSink { op_metadata, .. }
1303 | HydroRoot::CycleSink { op_metadata, .. }
1304 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1305 }
1306 }
1307
1308 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1309 match self {
1310 HydroRoot::ForEach { op_metadata, .. }
1311 | HydroRoot::SendExternal { op_metadata, .. }
1312 | HydroRoot::DestSink { op_metadata, .. }
1313 | HydroRoot::CycleSink { op_metadata, .. }
1314 | HydroRoot::EmbeddedOutput { op_metadata, .. } => op_metadata,
1315 }
1316 }
1317
1318 pub fn input(&self) -> &HydroNode {
1319 match self {
1320 HydroRoot::ForEach { input, .. }
1321 | HydroRoot::SendExternal { input, .. }
1322 | HydroRoot::DestSink { input, .. }
1323 | HydroRoot::CycleSink { input, .. }
1324 | HydroRoot::EmbeddedOutput { input, .. } => input,
1325 }
1326 }
1327
1328 pub fn input_metadata(&self) -> &HydroIrMetadata {
1329 self.input().metadata()
1330 }
1331
1332 pub fn print_root(&self) -> String {
1333 match self {
1334 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1335 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1336 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1337 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1338 HydroRoot::EmbeddedOutput { ident, .. } => {
1339 format!("EmbeddedOutput({})", ident)
1340 }
1341 }
1342 }
1343
1344 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1345 match self {
1346 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1347 transform(f);
1348 }
1349 HydroRoot::SendExternal { .. }
1350 | HydroRoot::CycleSink { .. }
1351 | HydroRoot::EmbeddedOutput { .. } => {}
1352 }
1353 }
1354}
1355
1356#[cfg(feature = "build")]
1357pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1358 let mut builders = SecondaryMap::new();
1359 let mut seen_tees = HashMap::new();
1360 let mut built_tees = HashMap::new();
1361 let mut next_stmt_id = 0;
1362 for leaf in ir {
1363 leaf.emit(
1364 &mut builders,
1365 &mut seen_tees,
1366 &mut built_tees,
1367 &mut next_stmt_id,
1368 );
1369 }
1370 builders
1371}
1372
1373#[cfg(feature = "build")]
1374pub fn traverse_dfir(
1375 ir: &mut [HydroRoot],
1376 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1377 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1378) {
1379 let mut seen_tees = HashMap::new();
1380 let mut built_tees = HashMap::new();
1381 let mut next_stmt_id = 0;
1382 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1383 ir.iter_mut().for_each(|leaf| {
1384 leaf.emit_core(
1385 &mut callback,
1386 &mut seen_tees,
1387 &mut built_tees,
1388 &mut next_stmt_id,
1389 );
1390 });
1391}
1392
1393pub fn transform_bottom_up(
1394 ir: &mut [HydroRoot],
1395 transform_root: &mut impl FnMut(&mut HydroRoot),
1396 transform_node: &mut impl FnMut(&mut HydroNode),
1397 check_well_formed: bool,
1398) {
1399 let mut seen_tees = HashMap::new();
1400 ir.iter_mut().for_each(|leaf| {
1401 leaf.transform_bottom_up(
1402 transform_root,
1403 transform_node,
1404 &mut seen_tees,
1405 check_well_formed,
1406 );
1407 });
1408}
1409
1410pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1411 let mut seen_tees = HashMap::new();
1412 ir.iter()
1413 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1414 .collect()
1415}
1416
1417type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1418thread_local! {
1419 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1420}
1421
1422pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1423 PRINTED_TEES.with(|printed_tees| {
1424 let mut printed_tees_mut = printed_tees.borrow_mut();
1425 *printed_tees_mut = Some((0, HashMap::new()));
1426 drop(printed_tees_mut);
1427
1428 let ret = f();
1429
1430 let mut printed_tees_mut = printed_tees.borrow_mut();
1431 *printed_tees_mut = None;
1432
1433 ret
1434 })
1435}
1436
1437pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1438
1439impl TeeNode {
1440 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1441 Rc::as_ptr(&self.0)
1442 }
1443}
1444
1445impl Debug for TeeNode {
1446 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1447 PRINTED_TEES.with(|printed_tees| {
1448 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1449 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1450
1451 if let Some(printed_tees_mut) = printed_tees_mut {
1452 if let Some(existing) = printed_tees_mut
1453 .1
1454 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1455 {
1456 write!(f, "<tee {}>", existing)
1457 } else {
1458 let next_id = printed_tees_mut.0;
1459 printed_tees_mut.0 += 1;
1460 printed_tees_mut
1461 .1
1462 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1463 drop(printed_tees_mut_borrow);
1464 write!(f, "<tee {}>: ", next_id)?;
1465 Debug::fmt(&self.0.borrow(), f)
1466 }
1467 } else {
1468 drop(printed_tees_mut_borrow);
1469 write!(f, "<tee>: ")?;
1470 Debug::fmt(&self.0.borrow(), f)
1471 }
1472 })
1473 }
1474}
1475
1476impl Hash for TeeNode {
1477 fn hash<H: Hasher>(&self, state: &mut H) {
1478 self.0.borrow_mut().hash(state);
1479 }
1480}
1481
1482#[derive(Clone, PartialEq, Eq, Debug)]
1483pub enum BoundKind {
1484 Unbounded,
1485 Bounded,
1486}
1487
1488#[derive(Clone, PartialEq, Eq, Debug)]
1489pub enum StreamOrder {
1490 NoOrder,
1491 TotalOrder,
1492}
1493
1494#[derive(Clone, PartialEq, Eq, Debug)]
1495pub enum StreamRetry {
1496 AtLeastOnce,
1497 ExactlyOnce,
1498}
1499
1500#[derive(Clone, PartialEq, Eq, Debug)]
1501pub enum KeyedSingletonBoundKind {
1502 Unbounded,
1503 BoundedValue,
1504 Bounded,
1505}
1506
1507#[derive(Clone, PartialEq, Eq, Debug)]
1508pub enum CollectionKind {
1509 Stream {
1510 bound: BoundKind,
1511 order: StreamOrder,
1512 retry: StreamRetry,
1513 element_type: DebugType,
1514 },
1515 Singleton {
1516 bound: BoundKind,
1517 element_type: DebugType,
1518 },
1519 Optional {
1520 bound: BoundKind,
1521 element_type: DebugType,
1522 },
1523 KeyedStream {
1524 bound: BoundKind,
1525 value_order: StreamOrder,
1526 value_retry: StreamRetry,
1527 key_type: DebugType,
1528 value_type: DebugType,
1529 },
1530 KeyedSingleton {
1531 bound: KeyedSingletonBoundKind,
1532 key_type: DebugType,
1533 value_type: DebugType,
1534 },
1535}
1536
1537impl CollectionKind {
1538 pub fn is_bounded(&self) -> bool {
1539 matches!(
1540 self,
1541 CollectionKind::Stream {
1542 bound: BoundKind::Bounded,
1543 ..
1544 } | CollectionKind::Singleton {
1545 bound: BoundKind::Bounded,
1546 ..
1547 } | CollectionKind::Optional {
1548 bound: BoundKind::Bounded,
1549 ..
1550 } | CollectionKind::KeyedStream {
1551 bound: BoundKind::Bounded,
1552 ..
1553 } | CollectionKind::KeyedSingleton {
1554 bound: KeyedSingletonBoundKind::Bounded,
1555 ..
1556 }
1557 )
1558 }
1559}
1560
1561#[derive(Clone)]
1562pub struct HydroIrMetadata {
1563 pub location_id: LocationId,
1564 pub collection_kind: CollectionKind,
1565 pub cardinality: Option<usize>,
1566 pub tag: Option<String>,
1567 pub op: HydroIrOpMetadata,
1568}
1569
1570impl Hash for HydroIrMetadata {
1572 fn hash<H: Hasher>(&self, _: &mut H) {}
1573}
1574
1575impl PartialEq for HydroIrMetadata {
1576 fn eq(&self, _: &Self) -> bool {
1577 true
1578 }
1579}
1580
1581impl Eq for HydroIrMetadata {}
1582
1583impl Debug for HydroIrMetadata {
1584 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1585 f.debug_struct("HydroIrMetadata")
1586 .field("location_id", &self.location_id)
1587 .field("collection_kind", &self.collection_kind)
1588 .finish()
1589 }
1590}
1591
1592#[derive(Clone)]
1595pub struct HydroIrOpMetadata {
1596 pub backtrace: Backtrace,
1597 pub cpu_usage: Option<f64>,
1598 pub network_recv_cpu_usage: Option<f64>,
1599 pub id: Option<usize>,
1600}
1601
1602impl HydroIrOpMetadata {
1603 #[expect(
1604 clippy::new_without_default,
1605 reason = "explicit calls to new ensure correct backtrace bounds"
1606 )]
1607 pub fn new() -> HydroIrOpMetadata {
1608 Self::new_with_skip(1)
1609 }
1610
1611 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1612 HydroIrOpMetadata {
1613 backtrace: Backtrace::get_backtrace(2 + skip_count),
1614 cpu_usage: None,
1615 network_recv_cpu_usage: None,
1616 id: None,
1617 }
1618 }
1619}
1620
1621impl Debug for HydroIrOpMetadata {
1622 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1623 f.debug_struct("HydroIrOpMetadata").finish()
1624 }
1625}
1626
1627impl Hash for HydroIrOpMetadata {
1628 fn hash<H: Hasher>(&self, _: &mut H) {}
1629}
1630
1631#[derive(Debug, Hash)]
1634pub enum HydroNode {
1635 Placeholder,
1636
1637 Cast {
1645 inner: Box<HydroNode>,
1646 metadata: HydroIrMetadata,
1647 },
1648
1649 ObserveNonDet {
1655 inner: Box<HydroNode>,
1656 trusted: bool, metadata: HydroIrMetadata,
1658 },
1659
1660 Source {
1661 source: HydroSource,
1662 metadata: HydroIrMetadata,
1663 },
1664
1665 SingletonSource {
1666 value: DebugExpr,
1667 metadata: HydroIrMetadata,
1668 },
1669
1670 CycleSource {
1671 cycle_id: CycleId,
1672 metadata: HydroIrMetadata,
1673 },
1674
1675 Tee {
1676 inner: TeeNode,
1677 metadata: HydroIrMetadata,
1678 },
1679
1680 BeginAtomic {
1681 inner: Box<HydroNode>,
1682 metadata: HydroIrMetadata,
1683 },
1684
1685 EndAtomic {
1686 inner: Box<HydroNode>,
1687 metadata: HydroIrMetadata,
1688 },
1689
1690 Batch {
1691 inner: Box<HydroNode>,
1692 metadata: HydroIrMetadata,
1693 },
1694
1695 YieldConcat {
1696 inner: Box<HydroNode>,
1697 metadata: HydroIrMetadata,
1698 },
1699
1700 Chain {
1701 first: Box<HydroNode>,
1702 second: Box<HydroNode>,
1703 metadata: HydroIrMetadata,
1704 },
1705
1706 ChainFirst {
1707 first: Box<HydroNode>,
1708 second: Box<HydroNode>,
1709 metadata: HydroIrMetadata,
1710 },
1711
1712 CrossProduct {
1713 left: Box<HydroNode>,
1714 right: Box<HydroNode>,
1715 metadata: HydroIrMetadata,
1716 },
1717
1718 CrossSingleton {
1719 left: Box<HydroNode>,
1720 right: Box<HydroNode>,
1721 metadata: HydroIrMetadata,
1722 },
1723
1724 Join {
1725 left: Box<HydroNode>,
1726 right: Box<HydroNode>,
1727 metadata: HydroIrMetadata,
1728 },
1729
1730 Difference {
1731 pos: Box<HydroNode>,
1732 neg: Box<HydroNode>,
1733 metadata: HydroIrMetadata,
1734 },
1735
1736 AntiJoin {
1737 pos: Box<HydroNode>,
1738 neg: Box<HydroNode>,
1739 metadata: HydroIrMetadata,
1740 },
1741
1742 ResolveFutures {
1743 input: Box<HydroNode>,
1744 metadata: HydroIrMetadata,
1745 },
1746 ResolveFuturesOrdered {
1747 input: Box<HydroNode>,
1748 metadata: HydroIrMetadata,
1749 },
1750
1751 Map {
1752 f: DebugExpr,
1753 input: Box<HydroNode>,
1754 metadata: HydroIrMetadata,
1755 },
1756 FlatMap {
1757 f: DebugExpr,
1758 input: Box<HydroNode>,
1759 metadata: HydroIrMetadata,
1760 },
1761 Filter {
1762 f: DebugExpr,
1763 input: Box<HydroNode>,
1764 metadata: HydroIrMetadata,
1765 },
1766 FilterMap {
1767 f: DebugExpr,
1768 input: Box<HydroNode>,
1769 metadata: HydroIrMetadata,
1770 },
1771
1772 DeferTick {
1773 input: Box<HydroNode>,
1774 metadata: HydroIrMetadata,
1775 },
1776 Enumerate {
1777 input: Box<HydroNode>,
1778 metadata: HydroIrMetadata,
1779 },
1780 Inspect {
1781 f: DebugExpr,
1782 input: Box<HydroNode>,
1783 metadata: HydroIrMetadata,
1784 },
1785
1786 Unique {
1787 input: Box<HydroNode>,
1788 metadata: HydroIrMetadata,
1789 },
1790
1791 Sort {
1792 input: Box<HydroNode>,
1793 metadata: HydroIrMetadata,
1794 },
1795 Fold {
1796 init: DebugExpr,
1797 acc: DebugExpr,
1798 input: Box<HydroNode>,
1799 metadata: HydroIrMetadata,
1800 },
1801
1802 Scan {
1803 init: DebugExpr,
1804 acc: DebugExpr,
1805 input: Box<HydroNode>,
1806 metadata: HydroIrMetadata,
1807 },
1808 FoldKeyed {
1809 init: DebugExpr,
1810 acc: DebugExpr,
1811 input: Box<HydroNode>,
1812 metadata: HydroIrMetadata,
1813 },
1814
1815 Reduce {
1816 f: DebugExpr,
1817 input: Box<HydroNode>,
1818 metadata: HydroIrMetadata,
1819 },
1820 ReduceKeyed {
1821 f: DebugExpr,
1822 input: Box<HydroNode>,
1823 metadata: HydroIrMetadata,
1824 },
1825 ReduceKeyedWatermark {
1826 f: DebugExpr,
1827 input: Box<HydroNode>,
1828 watermark: Box<HydroNode>,
1829 metadata: HydroIrMetadata,
1830 },
1831
1832 Network {
1833 name: Option<String>,
1834 networking_info: crate::networking::NetworkingInfo,
1835 serialize_fn: Option<DebugExpr>,
1836 instantiate_fn: DebugInstantiate,
1837 deserialize_fn: Option<DebugExpr>,
1838 input: Box<HydroNode>,
1839 metadata: HydroIrMetadata,
1840 },
1841
1842 ExternalInput {
1843 from_external_key: LocationKey,
1844 from_port_id: ExternalPortId,
1845 from_many: bool,
1846 codec_type: DebugType,
1847 port_hint: NetworkHint,
1848 instantiate_fn: DebugInstantiate,
1849 deserialize_fn: Option<DebugExpr>,
1850 metadata: HydroIrMetadata,
1851 },
1852
1853 Counter {
1854 tag: String,
1855 duration: DebugExpr,
1856 prefix: String,
1857 input: Box<HydroNode>,
1858 metadata: HydroIrMetadata,
1859 },
1860}
1861
1862pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1863pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1864
1865impl HydroNode {
1866 pub fn transform_bottom_up(
1867 &mut self,
1868 transform: &mut impl FnMut(&mut HydroNode),
1869 seen_tees: &mut SeenTees,
1870 check_well_formed: bool,
1871 ) {
1872 self.transform_children(
1873 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1874 seen_tees,
1875 );
1876
1877 transform(self);
1878
1879 let self_location = self.metadata().location_id.root();
1880
1881 if check_well_formed {
1882 match &*self {
1883 HydroNode::Network { .. } => {}
1884 _ => {
1885 self.input_metadata().iter().for_each(|i| {
1886 if i.location_id.root() != self_location {
1887 panic!(
1888 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1889 i,
1890 i.location_id.root(),
1891 self,
1892 self_location
1893 )
1894 }
1895 });
1896 }
1897 }
1898 }
1899 }
1900
1901 #[inline(always)]
1902 pub fn transform_children(
1903 &mut self,
1904 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1905 seen_tees: &mut SeenTees,
1906 ) {
1907 match self {
1908 HydroNode::Placeholder => {
1909 panic!();
1910 }
1911
1912 HydroNode::Source { .. }
1913 | HydroNode::SingletonSource { .. }
1914 | HydroNode::CycleSource { .. }
1915 | HydroNode::ExternalInput { .. } => {}
1916
1917 HydroNode::Tee { inner, .. } => {
1918 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1919 *inner = TeeNode(transformed.clone());
1920 } else {
1921 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1922 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1923 let mut orig = inner.0.replace(HydroNode::Placeholder);
1924 transform(&mut orig, seen_tees);
1925 *transformed_cell.borrow_mut() = orig;
1926 *inner = TeeNode(transformed_cell);
1927 }
1928 }
1929
1930 HydroNode::Cast { inner, .. }
1931 | HydroNode::ObserveNonDet { inner, .. }
1932 | HydroNode::BeginAtomic { inner, .. }
1933 | HydroNode::EndAtomic { inner, .. }
1934 | HydroNode::Batch { inner, .. }
1935 | HydroNode::YieldConcat { inner, .. } => {
1936 transform(inner.as_mut(), seen_tees);
1937 }
1938
1939 HydroNode::Chain { first, second, .. } => {
1940 transform(first.as_mut(), seen_tees);
1941 transform(second.as_mut(), seen_tees);
1942 }
1943
1944 HydroNode::ChainFirst { first, second, .. } => {
1945 transform(first.as_mut(), seen_tees);
1946 transform(second.as_mut(), seen_tees);
1947 }
1948
1949 HydroNode::CrossSingleton { left, right, .. }
1950 | HydroNode::CrossProduct { left, right, .. }
1951 | HydroNode::Join { left, right, .. } => {
1952 transform(left.as_mut(), seen_tees);
1953 transform(right.as_mut(), seen_tees);
1954 }
1955
1956 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1957 transform(pos.as_mut(), seen_tees);
1958 transform(neg.as_mut(), seen_tees);
1959 }
1960
1961 HydroNode::ReduceKeyedWatermark {
1962 input, watermark, ..
1963 } => {
1964 transform(input.as_mut(), seen_tees);
1965 transform(watermark.as_mut(), seen_tees);
1966 }
1967
1968 HydroNode::Map { input, .. }
1969 | HydroNode::ResolveFutures { input, .. }
1970 | HydroNode::ResolveFuturesOrdered { input, .. }
1971 | HydroNode::FlatMap { input, .. }
1972 | HydroNode::Filter { input, .. }
1973 | HydroNode::FilterMap { input, .. }
1974 | HydroNode::Sort { input, .. }
1975 | HydroNode::DeferTick { input, .. }
1976 | HydroNode::Enumerate { input, .. }
1977 | HydroNode::Inspect { input, .. }
1978 | HydroNode::Unique { input, .. }
1979 | HydroNode::Network { input, .. }
1980 | HydroNode::Fold { input, .. }
1981 | HydroNode::Scan { input, .. }
1982 | HydroNode::FoldKeyed { input, .. }
1983 | HydroNode::Reduce { input, .. }
1984 | HydroNode::ReduceKeyed { input, .. }
1985 | HydroNode::Counter { input, .. } => {
1986 transform(input.as_mut(), seen_tees);
1987 }
1988 }
1989 }
1990
1991 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1992 match self {
1993 HydroNode::Placeholder => HydroNode::Placeholder,
1994 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1995 inner: Box::new(inner.deep_clone(seen_tees)),
1996 metadata: metadata.clone(),
1997 },
1998 HydroNode::ObserveNonDet {
1999 inner,
2000 trusted,
2001 metadata,
2002 } => HydroNode::ObserveNonDet {
2003 inner: Box::new(inner.deep_clone(seen_tees)),
2004 trusted: *trusted,
2005 metadata: metadata.clone(),
2006 },
2007 HydroNode::Source { source, metadata } => HydroNode::Source {
2008 source: source.clone(),
2009 metadata: metadata.clone(),
2010 },
2011 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
2012 value: value.clone(),
2013 metadata: metadata.clone(),
2014 },
2015 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2016 cycle_id: *cycle_id,
2017 metadata: metadata.clone(),
2018 },
2019 HydroNode::Tee { inner, metadata } => {
2020 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2021 HydroNode::Tee {
2022 inner: TeeNode(transformed.clone()),
2023 metadata: metadata.clone(),
2024 }
2025 } else {
2026 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2027 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2028 let cloned = inner.0.borrow().deep_clone(seen_tees);
2029 *new_rc.borrow_mut() = cloned;
2030 HydroNode::Tee {
2031 inner: TeeNode(new_rc),
2032 metadata: metadata.clone(),
2033 }
2034 }
2035 }
2036 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2037 inner: Box::new(inner.deep_clone(seen_tees)),
2038 metadata: metadata.clone(),
2039 },
2040 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2041 inner: Box::new(inner.deep_clone(seen_tees)),
2042 metadata: metadata.clone(),
2043 },
2044 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2045 inner: Box::new(inner.deep_clone(seen_tees)),
2046 metadata: metadata.clone(),
2047 },
2048 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2049 inner: Box::new(inner.deep_clone(seen_tees)),
2050 metadata: metadata.clone(),
2051 },
2052 HydroNode::Chain {
2053 first,
2054 second,
2055 metadata,
2056 } => HydroNode::Chain {
2057 first: Box::new(first.deep_clone(seen_tees)),
2058 second: Box::new(second.deep_clone(seen_tees)),
2059 metadata: metadata.clone(),
2060 },
2061 HydroNode::ChainFirst {
2062 first,
2063 second,
2064 metadata,
2065 } => HydroNode::ChainFirst {
2066 first: Box::new(first.deep_clone(seen_tees)),
2067 second: Box::new(second.deep_clone(seen_tees)),
2068 metadata: metadata.clone(),
2069 },
2070 HydroNode::CrossProduct {
2071 left,
2072 right,
2073 metadata,
2074 } => HydroNode::CrossProduct {
2075 left: Box::new(left.deep_clone(seen_tees)),
2076 right: Box::new(right.deep_clone(seen_tees)),
2077 metadata: metadata.clone(),
2078 },
2079 HydroNode::CrossSingleton {
2080 left,
2081 right,
2082 metadata,
2083 } => HydroNode::CrossSingleton {
2084 left: Box::new(left.deep_clone(seen_tees)),
2085 right: Box::new(right.deep_clone(seen_tees)),
2086 metadata: metadata.clone(),
2087 },
2088 HydroNode::Join {
2089 left,
2090 right,
2091 metadata,
2092 } => HydroNode::Join {
2093 left: Box::new(left.deep_clone(seen_tees)),
2094 right: Box::new(right.deep_clone(seen_tees)),
2095 metadata: metadata.clone(),
2096 },
2097 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
2098 pos: Box::new(pos.deep_clone(seen_tees)),
2099 neg: Box::new(neg.deep_clone(seen_tees)),
2100 metadata: metadata.clone(),
2101 },
2102 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
2103 pos: Box::new(pos.deep_clone(seen_tees)),
2104 neg: Box::new(neg.deep_clone(seen_tees)),
2105 metadata: metadata.clone(),
2106 },
2107 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
2108 input: Box::new(input.deep_clone(seen_tees)),
2109 metadata: metadata.clone(),
2110 },
2111 HydroNode::ResolveFuturesOrdered { input, metadata } => {
2112 HydroNode::ResolveFuturesOrdered {
2113 input: Box::new(input.deep_clone(seen_tees)),
2114 metadata: metadata.clone(),
2115 }
2116 }
2117 HydroNode::Map { f, input, metadata } => HydroNode::Map {
2118 f: f.clone(),
2119 input: Box::new(input.deep_clone(seen_tees)),
2120 metadata: metadata.clone(),
2121 },
2122 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
2123 f: f.clone(),
2124 input: Box::new(input.deep_clone(seen_tees)),
2125 metadata: metadata.clone(),
2126 },
2127 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
2128 f: f.clone(),
2129 input: Box::new(input.deep_clone(seen_tees)),
2130 metadata: metadata.clone(),
2131 },
2132 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
2133 f: f.clone(),
2134 input: Box::new(input.deep_clone(seen_tees)),
2135 metadata: metadata.clone(),
2136 },
2137 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2138 input: Box::new(input.deep_clone(seen_tees)),
2139 metadata: metadata.clone(),
2140 },
2141 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2142 input: Box::new(input.deep_clone(seen_tees)),
2143 metadata: metadata.clone(),
2144 },
2145 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2146 f: f.clone(),
2147 input: Box::new(input.deep_clone(seen_tees)),
2148 metadata: metadata.clone(),
2149 },
2150 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2151 input: Box::new(input.deep_clone(seen_tees)),
2152 metadata: metadata.clone(),
2153 },
2154 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2155 input: Box::new(input.deep_clone(seen_tees)),
2156 metadata: metadata.clone(),
2157 },
2158 HydroNode::Fold {
2159 init,
2160 acc,
2161 input,
2162 metadata,
2163 } => HydroNode::Fold {
2164 init: init.clone(),
2165 acc: acc.clone(),
2166 input: Box::new(input.deep_clone(seen_tees)),
2167 metadata: metadata.clone(),
2168 },
2169 HydroNode::Scan {
2170 init,
2171 acc,
2172 input,
2173 metadata,
2174 } => HydroNode::Scan {
2175 init: init.clone(),
2176 acc: acc.clone(),
2177 input: Box::new(input.deep_clone(seen_tees)),
2178 metadata: metadata.clone(),
2179 },
2180 HydroNode::FoldKeyed {
2181 init,
2182 acc,
2183 input,
2184 metadata,
2185 } => HydroNode::FoldKeyed {
2186 init: init.clone(),
2187 acc: acc.clone(),
2188 input: Box::new(input.deep_clone(seen_tees)),
2189 metadata: metadata.clone(),
2190 },
2191 HydroNode::ReduceKeyedWatermark {
2192 f,
2193 input,
2194 watermark,
2195 metadata,
2196 } => HydroNode::ReduceKeyedWatermark {
2197 f: f.clone(),
2198 input: Box::new(input.deep_clone(seen_tees)),
2199 watermark: Box::new(watermark.deep_clone(seen_tees)),
2200 metadata: metadata.clone(),
2201 },
2202 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2203 f: f.clone(),
2204 input: Box::new(input.deep_clone(seen_tees)),
2205 metadata: metadata.clone(),
2206 },
2207 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2208 f: f.clone(),
2209 input: Box::new(input.deep_clone(seen_tees)),
2210 metadata: metadata.clone(),
2211 },
2212 HydroNode::Network {
2213 name,
2214 networking_info,
2215 serialize_fn,
2216 instantiate_fn,
2217 deserialize_fn,
2218 input,
2219 metadata,
2220 } => HydroNode::Network {
2221 name: name.clone(),
2222 networking_info: networking_info.clone(),
2223 serialize_fn: serialize_fn.clone(),
2224 instantiate_fn: instantiate_fn.clone(),
2225 deserialize_fn: deserialize_fn.clone(),
2226 input: Box::new(input.deep_clone(seen_tees)),
2227 metadata: metadata.clone(),
2228 },
2229 HydroNode::ExternalInput {
2230 from_external_key,
2231 from_port_id,
2232 from_many,
2233 codec_type,
2234 port_hint,
2235 instantiate_fn,
2236 deserialize_fn,
2237 metadata,
2238 } => HydroNode::ExternalInput {
2239 from_external_key: *from_external_key,
2240 from_port_id: *from_port_id,
2241 from_many: *from_many,
2242 codec_type: codec_type.clone(),
2243 port_hint: *port_hint,
2244 instantiate_fn: instantiate_fn.clone(),
2245 deserialize_fn: deserialize_fn.clone(),
2246 metadata: metadata.clone(),
2247 },
2248 HydroNode::Counter {
2249 tag,
2250 duration,
2251 prefix,
2252 input,
2253 metadata,
2254 } => HydroNode::Counter {
2255 tag: tag.clone(),
2256 duration: duration.clone(),
2257 prefix: prefix.clone(),
2258 input: Box::new(input.deep_clone(seen_tees)),
2259 metadata: metadata.clone(),
2260 },
2261 }
2262 }
2263
2264 #[cfg(feature = "build")]
2265 pub fn emit_core(
2266 &mut self,
2267 builders_or_callback: &mut BuildersOrCallback<
2268 impl FnMut(&mut HydroRoot, &mut usize),
2269 impl FnMut(&mut HydroNode, &mut usize),
2270 >,
2271 seen_tees: &mut SeenTees,
2272 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2273 next_stmt_id: &mut usize,
2274 ) -> syn::Ident {
2275 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2276
2277 self.transform_bottom_up(
2278 &mut |node: &mut HydroNode| {
2279 let out_location = node.metadata().location_id.clone();
2280 match node {
2281 HydroNode::Placeholder => {
2282 panic!()
2283 }
2284
2285 HydroNode::Cast { .. } => {
2286 match builders_or_callback {
2289 BuildersOrCallback::Builders(_) => {}
2290 BuildersOrCallback::Callback(_, node_callback) => {
2291 node_callback(node, next_stmt_id);
2292 }
2293 }
2294
2295 *next_stmt_id += 1;
2296 }
2298
2299 HydroNode::ObserveNonDet {
2300 inner,
2301 trusted,
2302 metadata,
2303 ..
2304 } => {
2305 let inner_ident = ident_stack.pop().unwrap();
2306
2307 let observe_ident =
2308 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2309
2310 match builders_or_callback {
2311 BuildersOrCallback::Builders(graph_builders) => {
2312 graph_builders.observe_nondet(
2313 *trusted,
2314 &inner.metadata().location_id,
2315 inner_ident,
2316 &inner.metadata().collection_kind,
2317 &observe_ident,
2318 &metadata.collection_kind,
2319 &metadata.op,
2320 );
2321 }
2322 BuildersOrCallback::Callback(_, node_callback) => {
2323 node_callback(node, next_stmt_id);
2324 }
2325 }
2326
2327 *next_stmt_id += 1;
2328
2329 ident_stack.push(observe_ident);
2330 }
2331
2332 HydroNode::Batch {
2333 inner, metadata, ..
2334 } => {
2335 let inner_ident = ident_stack.pop().unwrap();
2336
2337 let batch_ident =
2338 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2339
2340 match builders_or_callback {
2341 BuildersOrCallback::Builders(graph_builders) => {
2342 graph_builders.batch(
2343 inner_ident,
2344 &inner.metadata().location_id,
2345 &inner.metadata().collection_kind,
2346 &batch_ident,
2347 &out_location,
2348 &metadata.op,
2349 );
2350 }
2351 BuildersOrCallback::Callback(_, node_callback) => {
2352 node_callback(node, next_stmt_id);
2353 }
2354 }
2355
2356 *next_stmt_id += 1;
2357
2358 ident_stack.push(batch_ident);
2359 }
2360
2361 HydroNode::YieldConcat { inner, .. } => {
2362 let inner_ident = ident_stack.pop().unwrap();
2363
2364 let yield_ident =
2365 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2366
2367 match builders_or_callback {
2368 BuildersOrCallback::Builders(graph_builders) => {
2369 graph_builders.yield_from_tick(
2370 inner_ident,
2371 &inner.metadata().location_id,
2372 &inner.metadata().collection_kind,
2373 &yield_ident,
2374 &out_location,
2375 );
2376 }
2377 BuildersOrCallback::Callback(_, node_callback) => {
2378 node_callback(node, next_stmt_id);
2379 }
2380 }
2381
2382 *next_stmt_id += 1;
2383
2384 ident_stack.push(yield_ident);
2385 }
2386
2387 HydroNode::BeginAtomic { inner, metadata } => {
2388 let inner_ident = ident_stack.pop().unwrap();
2389
2390 let begin_ident =
2391 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2392
2393 match builders_or_callback {
2394 BuildersOrCallback::Builders(graph_builders) => {
2395 graph_builders.begin_atomic(
2396 inner_ident,
2397 &inner.metadata().location_id,
2398 &inner.metadata().collection_kind,
2399 &begin_ident,
2400 &out_location,
2401 &metadata.op,
2402 );
2403 }
2404 BuildersOrCallback::Callback(_, node_callback) => {
2405 node_callback(node, next_stmt_id);
2406 }
2407 }
2408
2409 *next_stmt_id += 1;
2410
2411 ident_stack.push(begin_ident);
2412 }
2413
2414 HydroNode::EndAtomic { inner, .. } => {
2415 let inner_ident = ident_stack.pop().unwrap();
2416
2417 let end_ident =
2418 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2419
2420 match builders_or_callback {
2421 BuildersOrCallback::Builders(graph_builders) => {
2422 graph_builders.end_atomic(
2423 inner_ident,
2424 &inner.metadata().location_id,
2425 &inner.metadata().collection_kind,
2426 &end_ident,
2427 );
2428 }
2429 BuildersOrCallback::Callback(_, node_callback) => {
2430 node_callback(node, next_stmt_id);
2431 }
2432 }
2433
2434 *next_stmt_id += 1;
2435
2436 ident_stack.push(end_ident);
2437 }
2438
2439 HydroNode::Source {
2440 source, metadata, ..
2441 } => {
2442 if let HydroSource::ExternalNetwork() = source {
2443 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2444 } else {
2445 let source_ident =
2446 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2447
2448 let source_stmt = match source {
2449 HydroSource::Stream(expr) => {
2450 debug_assert!(metadata.location_id.is_top_level());
2451 parse_quote! {
2452 #source_ident = source_stream(#expr);
2453 }
2454 }
2455
2456 HydroSource::ExternalNetwork() => {
2457 unreachable!()
2458 }
2459
2460 HydroSource::Iter(expr) => {
2461 if metadata.location_id.is_top_level() {
2462 parse_quote! {
2463 #source_ident = source_iter(#expr);
2464 }
2465 } else {
2466 parse_quote! {
2468 #source_ident = source_iter(#expr) -> persist::<'static>();
2469 }
2470 }
2471 }
2472
2473 HydroSource::Spin() => {
2474 debug_assert!(metadata.location_id.is_top_level());
2475 parse_quote! {
2476 #source_ident = spin();
2477 }
2478 }
2479
2480 HydroSource::ClusterMembers(target_loc, state) => {
2481 debug_assert!(metadata.location_id.is_top_level());
2482
2483 let members_tee_ident = syn::Ident::new(
2484 &format!(
2485 "__cluster_members_tee_{}_{}",
2486 metadata.location_id.root().key(),
2487 target_loc.key(),
2488 ),
2489 Span::call_site(),
2490 );
2491
2492 match state {
2493 ClusterMembersState::Stream(d) => {
2494 parse_quote! {
2495 #members_tee_ident = source_stream(#d) -> tee();
2496 #source_ident = #members_tee_ident;
2497 }
2498 },
2499 ClusterMembersState::Uninit => syn::parse_quote! {
2500 #source_ident = source_stream(DUMMY);
2501 },
2502 ClusterMembersState::Tee(..) => parse_quote! {
2503 #source_ident = #members_tee_ident;
2504 },
2505 }
2506 }
2507
2508 HydroSource::Embedded(ident) => {
2509 parse_quote! {
2510 #source_ident = source_stream(#ident);
2511 }
2512 }
2513 };
2514
2515 match builders_or_callback {
2516 BuildersOrCallback::Builders(graph_builders) => {
2517 let builder = graph_builders.get_dfir_mut(&out_location);
2518 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2519 }
2520 BuildersOrCallback::Callback(_, node_callback) => {
2521 node_callback(node, next_stmt_id);
2522 }
2523 }
2524
2525 *next_stmt_id += 1;
2526
2527 ident_stack.push(source_ident);
2528 }
2529 }
2530
2531 HydroNode::SingletonSource { value, metadata } => {
2532 let source_ident =
2533 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2534
2535 match builders_or_callback {
2536 BuildersOrCallback::Builders(graph_builders) => {
2537 let builder = graph_builders.get_dfir_mut(&out_location);
2538
2539 if metadata.location_id.is_top_level()
2540 && metadata.collection_kind.is_bounded()
2541 {
2542 builder.add_dfir(
2543 parse_quote! {
2544 #source_ident = source_iter([#value]);
2545 },
2546 None,
2547 Some(&next_stmt_id.to_string()),
2548 );
2549 } else {
2550 builder.add_dfir(
2551 parse_quote! {
2552 #source_ident = source_iter([#value]) -> persist::<'static>();
2553 },
2554 None,
2555 Some(&next_stmt_id.to_string()),
2556 );
2557 }
2558 }
2559 BuildersOrCallback::Callback(_, node_callback) => {
2560 node_callback(node, next_stmt_id);
2561 }
2562 }
2563
2564 *next_stmt_id += 1;
2565
2566 ident_stack.push(source_ident);
2567 }
2568
2569 HydroNode::CycleSource { cycle_id, .. } => {
2570 let ident = cycle_id.as_ident();
2571
2572 match builders_or_callback {
2573 BuildersOrCallback::Builders(_) => {}
2574 BuildersOrCallback::Callback(_, node_callback) => {
2575 node_callback(node, next_stmt_id);
2576 }
2577 }
2578
2579 *next_stmt_id += 1;
2581
2582 ident_stack.push(ident);
2583 }
2584
2585 HydroNode::Tee { inner, .. } => {
2586 let ret_ident = if let Some(teed_from) =
2587 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2588 {
2589 match builders_or_callback {
2590 BuildersOrCallback::Builders(_) => {}
2591 BuildersOrCallback::Callback(_, node_callback) => {
2592 node_callback(node, next_stmt_id);
2593 }
2594 }
2595
2596 teed_from.clone()
2597 } else {
2598 let inner_ident = ident_stack.pop().unwrap();
2601
2602 let tee_ident =
2603 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2604
2605 built_tees.insert(
2606 inner.0.as_ref() as *const RefCell<HydroNode>,
2607 tee_ident.clone(),
2608 );
2609
2610 match builders_or_callback {
2611 BuildersOrCallback::Builders(graph_builders) => {
2612 let builder = graph_builders.get_dfir_mut(&out_location);
2613 builder.add_dfir(
2614 parse_quote! {
2615 #tee_ident = #inner_ident -> tee();
2616 },
2617 None,
2618 Some(&next_stmt_id.to_string()),
2619 );
2620 }
2621 BuildersOrCallback::Callback(_, node_callback) => {
2622 node_callback(node, next_stmt_id);
2623 }
2624 }
2625
2626 tee_ident
2627 };
2628
2629 *next_stmt_id += 1;
2633 ident_stack.push(ret_ident);
2634 }
2635
2636 HydroNode::Chain { .. } => {
2637 let second_ident = ident_stack.pop().unwrap();
2639 let first_ident = ident_stack.pop().unwrap();
2640
2641 let chain_ident =
2642 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2643
2644 match builders_or_callback {
2645 BuildersOrCallback::Builders(graph_builders) => {
2646 let builder = graph_builders.get_dfir_mut(&out_location);
2647 builder.add_dfir(
2648 parse_quote! {
2649 #chain_ident = chain();
2650 #first_ident -> [0]#chain_ident;
2651 #second_ident -> [1]#chain_ident;
2652 },
2653 None,
2654 Some(&next_stmt_id.to_string()),
2655 );
2656 }
2657 BuildersOrCallback::Callback(_, node_callback) => {
2658 node_callback(node, next_stmt_id);
2659 }
2660 }
2661
2662 *next_stmt_id += 1;
2663
2664 ident_stack.push(chain_ident);
2665 }
2666
2667 HydroNode::ChainFirst { .. } => {
2668 let second_ident = ident_stack.pop().unwrap();
2669 let first_ident = ident_stack.pop().unwrap();
2670
2671 let chain_ident =
2672 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2673
2674 match builders_or_callback {
2675 BuildersOrCallback::Builders(graph_builders) => {
2676 let builder = graph_builders.get_dfir_mut(&out_location);
2677 builder.add_dfir(
2678 parse_quote! {
2679 #chain_ident = chain_first_n(1);
2680 #first_ident -> [0]#chain_ident;
2681 #second_ident -> [1]#chain_ident;
2682 },
2683 None,
2684 Some(&next_stmt_id.to_string()),
2685 );
2686 }
2687 BuildersOrCallback::Callback(_, node_callback) => {
2688 node_callback(node, next_stmt_id);
2689 }
2690 }
2691
2692 *next_stmt_id += 1;
2693
2694 ident_stack.push(chain_ident);
2695 }
2696
2697 HydroNode::CrossSingleton { right, .. } => {
2698 let right_ident = ident_stack.pop().unwrap();
2699 let left_ident = ident_stack.pop().unwrap();
2700
2701 let cross_ident =
2702 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2703
2704 match builders_or_callback {
2705 BuildersOrCallback::Builders(graph_builders) => {
2706 let builder = graph_builders.get_dfir_mut(&out_location);
2707
2708 if right.metadata().location_id.is_top_level()
2709 && right.metadata().collection_kind.is_bounded()
2710 {
2711 builder.add_dfir(
2712 parse_quote! {
2713 #cross_ident = cross_singleton();
2714 #left_ident -> [input]#cross_ident;
2715 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2716 },
2717 None,
2718 Some(&next_stmt_id.to_string()),
2719 );
2720 } else {
2721 builder.add_dfir(
2722 parse_quote! {
2723 #cross_ident = cross_singleton();
2724 #left_ident -> [input]#cross_ident;
2725 #right_ident -> [single]#cross_ident;
2726 },
2727 None,
2728 Some(&next_stmt_id.to_string()),
2729 );
2730 }
2731 }
2732 BuildersOrCallback::Callback(_, node_callback) => {
2733 node_callback(node, next_stmt_id);
2734 }
2735 }
2736
2737 *next_stmt_id += 1;
2738
2739 ident_stack.push(cross_ident);
2740 }
2741
2742 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2743 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2744 parse_quote!(cross_join_multiset)
2745 } else {
2746 parse_quote!(join_multiset)
2747 };
2748
2749 let (HydroNode::CrossProduct { left, right, .. }
2750 | HydroNode::Join { left, right, .. }) = node
2751 else {
2752 unreachable!()
2753 };
2754
2755 let is_top_level = left.metadata().location_id.is_top_level()
2756 && right.metadata().location_id.is_top_level();
2757 let left_lifetime = if left.metadata().location_id.is_top_level() {
2758 quote!('static)
2759 } else {
2760 quote!('tick)
2761 };
2762
2763 let right_lifetime = if right.metadata().location_id.is_top_level() {
2764 quote!('static)
2765 } else {
2766 quote!('tick)
2767 };
2768
2769 let right_ident = ident_stack.pop().unwrap();
2770 let left_ident = ident_stack.pop().unwrap();
2771
2772 let stream_ident =
2773 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2774
2775 match builders_or_callback {
2776 BuildersOrCallback::Builders(graph_builders) => {
2777 let builder = graph_builders.get_dfir_mut(&out_location);
2778 builder.add_dfir(
2779 if is_top_level {
2780 parse_quote! {
2783 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2784 #left_ident -> [0]#stream_ident;
2785 #right_ident -> [1]#stream_ident;
2786 }
2787 } else {
2788 parse_quote! {
2789 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2790 #left_ident -> [0]#stream_ident;
2791 #right_ident -> [1]#stream_ident;
2792 }
2793 }
2794 ,
2795 None,
2796 Some(&next_stmt_id.to_string()),
2797 );
2798 }
2799 BuildersOrCallback::Callback(_, node_callback) => {
2800 node_callback(node, next_stmt_id);
2801 }
2802 }
2803
2804 *next_stmt_id += 1;
2805
2806 ident_stack.push(stream_ident);
2807 }
2808
2809 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2810 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2811 parse_quote!(difference)
2812 } else {
2813 parse_quote!(anti_join)
2814 };
2815
2816 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2817 node
2818 else {
2819 unreachable!()
2820 };
2821
2822 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2823 quote!('static)
2824 } else {
2825 quote!('tick)
2826 };
2827
2828 let neg_ident = ident_stack.pop().unwrap();
2829 let pos_ident = ident_stack.pop().unwrap();
2830
2831 let stream_ident =
2832 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2833
2834 match builders_or_callback {
2835 BuildersOrCallback::Builders(graph_builders) => {
2836 let builder = graph_builders.get_dfir_mut(&out_location);
2837 builder.add_dfir(
2838 parse_quote! {
2839 #stream_ident = #operator::<'tick, #neg_lifetime>();
2840 #pos_ident -> [pos]#stream_ident;
2841 #neg_ident -> [neg]#stream_ident;
2842 },
2843 None,
2844 Some(&next_stmt_id.to_string()),
2845 );
2846 }
2847 BuildersOrCallback::Callback(_, node_callback) => {
2848 node_callback(node, next_stmt_id);
2849 }
2850 }
2851
2852 *next_stmt_id += 1;
2853
2854 ident_stack.push(stream_ident);
2855 }
2856
2857 HydroNode::ResolveFutures { .. } => {
2858 let input_ident = ident_stack.pop().unwrap();
2859
2860 let futures_ident =
2861 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2862
2863 match builders_or_callback {
2864 BuildersOrCallback::Builders(graph_builders) => {
2865 let builder = graph_builders.get_dfir_mut(&out_location);
2866 builder.add_dfir(
2867 parse_quote! {
2868 #futures_ident = #input_ident -> resolve_futures();
2869 },
2870 None,
2871 Some(&next_stmt_id.to_string()),
2872 );
2873 }
2874 BuildersOrCallback::Callback(_, node_callback) => {
2875 node_callback(node, next_stmt_id);
2876 }
2877 }
2878
2879 *next_stmt_id += 1;
2880
2881 ident_stack.push(futures_ident);
2882 }
2883
2884 HydroNode::ResolveFuturesOrdered { .. } => {
2885 let input_ident = ident_stack.pop().unwrap();
2886
2887 let futures_ident =
2888 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2889
2890 match builders_or_callback {
2891 BuildersOrCallback::Builders(graph_builders) => {
2892 let builder = graph_builders.get_dfir_mut(&out_location);
2893 builder.add_dfir(
2894 parse_quote! {
2895 #futures_ident = #input_ident -> resolve_futures_ordered();
2896 },
2897 None,
2898 Some(&next_stmt_id.to_string()),
2899 );
2900 }
2901 BuildersOrCallback::Callback(_, node_callback) => {
2902 node_callback(node, next_stmt_id);
2903 }
2904 }
2905
2906 *next_stmt_id += 1;
2907
2908 ident_stack.push(futures_ident);
2909 }
2910
2911 HydroNode::Map { f, .. } => {
2912 let input_ident = ident_stack.pop().unwrap();
2913
2914 let map_ident =
2915 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2916
2917 match builders_or_callback {
2918 BuildersOrCallback::Builders(graph_builders) => {
2919 let builder = graph_builders.get_dfir_mut(&out_location);
2920 builder.add_dfir(
2921 parse_quote! {
2922 #map_ident = #input_ident -> map(#f);
2923 },
2924 None,
2925 Some(&next_stmt_id.to_string()),
2926 );
2927 }
2928 BuildersOrCallback::Callback(_, node_callback) => {
2929 node_callback(node, next_stmt_id);
2930 }
2931 }
2932
2933 *next_stmt_id += 1;
2934
2935 ident_stack.push(map_ident);
2936 }
2937
2938 HydroNode::FlatMap { f, .. } => {
2939 let input_ident = ident_stack.pop().unwrap();
2940
2941 let flat_map_ident =
2942 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2943
2944 match builders_or_callback {
2945 BuildersOrCallback::Builders(graph_builders) => {
2946 let builder = graph_builders.get_dfir_mut(&out_location);
2947 builder.add_dfir(
2948 parse_quote! {
2949 #flat_map_ident = #input_ident -> flat_map(#f);
2950 },
2951 None,
2952 Some(&next_stmt_id.to_string()),
2953 );
2954 }
2955 BuildersOrCallback::Callback(_, node_callback) => {
2956 node_callback(node, next_stmt_id);
2957 }
2958 }
2959
2960 *next_stmt_id += 1;
2961
2962 ident_stack.push(flat_map_ident);
2963 }
2964
2965 HydroNode::Filter { f, .. } => {
2966 let input_ident = ident_stack.pop().unwrap();
2967
2968 let filter_ident =
2969 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2970
2971 match builders_or_callback {
2972 BuildersOrCallback::Builders(graph_builders) => {
2973 let builder = graph_builders.get_dfir_mut(&out_location);
2974 builder.add_dfir(
2975 parse_quote! {
2976 #filter_ident = #input_ident -> filter(#f);
2977 },
2978 None,
2979 Some(&next_stmt_id.to_string()),
2980 );
2981 }
2982 BuildersOrCallback::Callback(_, node_callback) => {
2983 node_callback(node, next_stmt_id);
2984 }
2985 }
2986
2987 *next_stmt_id += 1;
2988
2989 ident_stack.push(filter_ident);
2990 }
2991
2992 HydroNode::FilterMap { f, .. } => {
2993 let input_ident = ident_stack.pop().unwrap();
2994
2995 let filter_map_ident =
2996 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2997
2998 match builders_or_callback {
2999 BuildersOrCallback::Builders(graph_builders) => {
3000 let builder = graph_builders.get_dfir_mut(&out_location);
3001 builder.add_dfir(
3002 parse_quote! {
3003 #filter_map_ident = #input_ident -> filter_map(#f);
3004 },
3005 None,
3006 Some(&next_stmt_id.to_string()),
3007 );
3008 }
3009 BuildersOrCallback::Callback(_, node_callback) => {
3010 node_callback(node, next_stmt_id);
3011 }
3012 }
3013
3014 *next_stmt_id += 1;
3015
3016 ident_stack.push(filter_map_ident);
3017 }
3018
3019 HydroNode::Sort { .. } => {
3020 let input_ident = ident_stack.pop().unwrap();
3021
3022 let sort_ident =
3023 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3024
3025 match builders_or_callback {
3026 BuildersOrCallback::Builders(graph_builders) => {
3027 let builder = graph_builders.get_dfir_mut(&out_location);
3028 builder.add_dfir(
3029 parse_quote! {
3030 #sort_ident = #input_ident -> sort();
3031 },
3032 None,
3033 Some(&next_stmt_id.to_string()),
3034 );
3035 }
3036 BuildersOrCallback::Callback(_, node_callback) => {
3037 node_callback(node, next_stmt_id);
3038 }
3039 }
3040
3041 *next_stmt_id += 1;
3042
3043 ident_stack.push(sort_ident);
3044 }
3045
3046 HydroNode::DeferTick { .. } => {
3047 let input_ident = ident_stack.pop().unwrap();
3048
3049 let defer_tick_ident =
3050 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3051
3052 match builders_or_callback {
3053 BuildersOrCallback::Builders(graph_builders) => {
3054 let builder = graph_builders.get_dfir_mut(&out_location);
3055 builder.add_dfir(
3056 parse_quote! {
3057 #defer_tick_ident = #input_ident -> defer_tick_lazy();
3058 },
3059 None,
3060 Some(&next_stmt_id.to_string()),
3061 );
3062 }
3063 BuildersOrCallback::Callback(_, node_callback) => {
3064 node_callback(node, next_stmt_id);
3065 }
3066 }
3067
3068 *next_stmt_id += 1;
3069
3070 ident_stack.push(defer_tick_ident);
3071 }
3072
3073 HydroNode::Enumerate { input, .. } => {
3074 let input_ident = ident_stack.pop().unwrap();
3075
3076 let enumerate_ident =
3077 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3078
3079 match builders_or_callback {
3080 BuildersOrCallback::Builders(graph_builders) => {
3081 let builder = graph_builders.get_dfir_mut(&out_location);
3082 let lifetime = if input.metadata().location_id.is_top_level() {
3083 quote!('static)
3084 } else {
3085 quote!('tick)
3086 };
3087 builder.add_dfir(
3088 parse_quote! {
3089 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
3090 },
3091 None,
3092 Some(&next_stmt_id.to_string()),
3093 );
3094 }
3095 BuildersOrCallback::Callback(_, node_callback) => {
3096 node_callback(node, next_stmt_id);
3097 }
3098 }
3099
3100 *next_stmt_id += 1;
3101
3102 ident_stack.push(enumerate_ident);
3103 }
3104
3105 HydroNode::Inspect { f, .. } => {
3106 let input_ident = ident_stack.pop().unwrap();
3107
3108 let inspect_ident =
3109 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3110
3111 match builders_or_callback {
3112 BuildersOrCallback::Builders(graph_builders) => {
3113 let builder = graph_builders.get_dfir_mut(&out_location);
3114 builder.add_dfir(
3115 parse_quote! {
3116 #inspect_ident = #input_ident -> inspect(#f);
3117 },
3118 None,
3119 Some(&next_stmt_id.to_string()),
3120 );
3121 }
3122 BuildersOrCallback::Callback(_, node_callback) => {
3123 node_callback(node, next_stmt_id);
3124 }
3125 }
3126
3127 *next_stmt_id += 1;
3128
3129 ident_stack.push(inspect_ident);
3130 }
3131
3132 HydroNode::Unique { input, .. } => {
3133 let input_ident = ident_stack.pop().unwrap();
3134
3135 let unique_ident =
3136 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3137
3138 match builders_or_callback {
3139 BuildersOrCallback::Builders(graph_builders) => {
3140 let builder = graph_builders.get_dfir_mut(&out_location);
3141 let lifetime = if input.metadata().location_id.is_top_level() {
3142 quote!('static)
3143 } else {
3144 quote!('tick)
3145 };
3146
3147 builder.add_dfir(
3148 parse_quote! {
3149 #unique_ident = #input_ident -> unique::<#lifetime>();
3150 },
3151 None,
3152 Some(&next_stmt_id.to_string()),
3153 );
3154 }
3155 BuildersOrCallback::Callback(_, node_callback) => {
3156 node_callback(node, next_stmt_id);
3157 }
3158 }
3159
3160 *next_stmt_id += 1;
3161
3162 ident_stack.push(unique_ident);
3163 }
3164
3165 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3166 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3167 if input.metadata().location_id.is_top_level()
3168 && input.metadata().collection_kind.is_bounded()
3169 {
3170 parse_quote!(fold_no_replay)
3171 } else {
3172 parse_quote!(fold)
3173 }
3174 } else if matches!(node, HydroNode::Scan { .. }) {
3175 parse_quote!(scan)
3176 } else if let HydroNode::FoldKeyed { input, .. } = node {
3177 if input.metadata().location_id.is_top_level()
3178 && input.metadata().collection_kind.is_bounded()
3179 {
3180 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3181 } else {
3182 parse_quote!(fold_keyed)
3183 }
3184 } else {
3185 unreachable!()
3186 };
3187
3188 let (HydroNode::Fold { input, .. }
3189 | HydroNode::FoldKeyed { input, .. }
3190 | HydroNode::Scan { input, .. }) = node
3191 else {
3192 unreachable!()
3193 };
3194
3195 let lifetime = if input.metadata().location_id.is_top_level() {
3196 quote!('static)
3197 } else {
3198 quote!('tick)
3199 };
3200
3201 let input_ident = ident_stack.pop().unwrap();
3202
3203 let (HydroNode::Fold { init, acc, .. }
3204 | HydroNode::FoldKeyed { init, acc, .. }
3205 | HydroNode::Scan { init, acc, .. }) = &*node
3206 else {
3207 unreachable!()
3208 };
3209
3210 let fold_ident =
3211 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3212
3213 match builders_or_callback {
3214 BuildersOrCallback::Builders(graph_builders) => {
3215 if matches!(node, HydroNode::Fold { .. })
3216 && node.metadata().location_id.is_top_level()
3217 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3218 && graph_builders.singleton_intermediates()
3219 && !node.metadata().collection_kind.is_bounded()
3220 {
3221 let builder = graph_builders.get_dfir_mut(&out_location);
3222
3223 let acc: syn::Expr = parse_quote!({
3224 let mut __inner = #acc;
3225 move |__state, __value| {
3226 __inner(__state, __value);
3227 Some(__state.clone())
3228 }
3229 });
3230
3231 builder.add_dfir(
3232 parse_quote! {
3233 source_iter([(#init)()]) -> [0]#fold_ident;
3234 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3235 #fold_ident = chain();
3236 },
3237 None,
3238 Some(&next_stmt_id.to_string()),
3239 );
3240 } else if matches!(node, HydroNode::FoldKeyed { .. })
3241 && node.metadata().location_id.is_top_level()
3242 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3243 && graph_builders.singleton_intermediates()
3244 && !node.metadata().collection_kind.is_bounded()
3245 {
3246 let builder = graph_builders.get_dfir_mut(&out_location);
3247
3248 let acc: syn::Expr = parse_quote!({
3249 let mut __init = #init;
3250 let mut __inner = #acc;
3251 move |__state, __kv: (_, _)| {
3252 let __state = __state
3254 .entry(::std::clone::Clone::clone(&__kv.0))
3255 .or_insert_with(|| (__init)());
3256 __inner(__state, __kv.1);
3257 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3258 }
3259 });
3260
3261 builder.add_dfir(
3262 parse_quote! {
3263 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3264 },
3265 None,
3266 Some(&next_stmt_id.to_string()),
3267 );
3268 } else {
3269 let builder = graph_builders.get_dfir_mut(&out_location);
3270 builder.add_dfir(
3271 parse_quote! {
3272 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3273 },
3274 None,
3275 Some(&next_stmt_id.to_string()),
3276 );
3277 }
3278 }
3279 BuildersOrCallback::Callback(_, node_callback) => {
3280 node_callback(node, next_stmt_id);
3281 }
3282 }
3283
3284 *next_stmt_id += 1;
3285
3286 ident_stack.push(fold_ident);
3287 }
3288
3289 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3290 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3291 if input.metadata().location_id.is_top_level()
3292 && input.metadata().collection_kind.is_bounded()
3293 {
3294 parse_quote!(reduce_no_replay)
3295 } else {
3296 parse_quote!(reduce)
3297 }
3298 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3299 if input.metadata().location_id.is_top_level()
3300 && input.metadata().collection_kind.is_bounded()
3301 {
3302 todo!(
3303 "Calling keyed reduce on a top-level bounded collection is not supported"
3304 )
3305 } else {
3306 parse_quote!(reduce_keyed)
3307 }
3308 } else {
3309 unreachable!()
3310 };
3311
3312 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3313 else {
3314 unreachable!()
3315 };
3316
3317 let lifetime = if input.metadata().location_id.is_top_level() {
3318 quote!('static)
3319 } else {
3320 quote!('tick)
3321 };
3322
3323 let input_ident = ident_stack.pop().unwrap();
3324
3325 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3326 else {
3327 unreachable!()
3328 };
3329
3330 let reduce_ident =
3331 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3332
3333 match builders_or_callback {
3334 BuildersOrCallback::Builders(graph_builders) => {
3335 if matches!(node, HydroNode::Reduce { .. })
3336 && node.metadata().location_id.is_top_level()
3337 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3338 && graph_builders.singleton_intermediates()
3339 && !node.metadata().collection_kind.is_bounded()
3340 {
3341 todo!(
3342 "Reduce with optional intermediates is not yet supported in simulator"
3343 );
3344 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3345 && node.metadata().location_id.is_top_level()
3346 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3347 && graph_builders.singleton_intermediates()
3348 && !node.metadata().collection_kind.is_bounded()
3349 {
3350 todo!(
3351 "Reduce keyed with optional intermediates is not yet supported in simulator"
3352 );
3353 } else {
3354 let builder = graph_builders.get_dfir_mut(&out_location);
3355 builder.add_dfir(
3356 parse_quote! {
3357 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3358 },
3359 None,
3360 Some(&next_stmt_id.to_string()),
3361 );
3362 }
3363 }
3364 BuildersOrCallback::Callback(_, node_callback) => {
3365 node_callback(node, next_stmt_id);
3366 }
3367 }
3368
3369 *next_stmt_id += 1;
3370
3371 ident_stack.push(reduce_ident);
3372 }
3373
3374 HydroNode::ReduceKeyedWatermark {
3375 f,
3376 input,
3377 metadata,
3378 ..
3379 } => {
3380 let lifetime = if input.metadata().location_id.is_top_level() {
3381 quote!('static)
3382 } else {
3383 quote!('tick)
3384 };
3385
3386 let watermark_ident = ident_stack.pop().unwrap();
3388 let input_ident = ident_stack.pop().unwrap();
3389
3390 let chain_ident = syn::Ident::new(
3391 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3392 Span::call_site(),
3393 );
3394
3395 let fold_ident =
3396 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3397
3398 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3399 && input.metadata().collection_kind.is_bounded()
3400 {
3401 parse_quote!(fold_no_replay)
3402 } else {
3403 parse_quote!(fold)
3404 };
3405
3406 match builders_or_callback {
3407 BuildersOrCallback::Builders(graph_builders) => {
3408 if metadata.location_id.is_top_level()
3409 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3410 && graph_builders.singleton_intermediates()
3411 && !metadata.collection_kind.is_bounded()
3412 {
3413 todo!(
3414 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3415 )
3416 } else {
3417 let builder = graph_builders.get_dfir_mut(&out_location);
3418 builder.add_dfir(
3419 parse_quote! {
3420 #chain_ident = chain();
3421 #input_ident
3422 -> map(|x| (Some(x), None))
3423 -> [0]#chain_ident;
3424 #watermark_ident
3425 -> map(|watermark| (None, Some(watermark)))
3426 -> [1]#chain_ident;
3427
3428 #fold_ident = #chain_ident
3429 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3430 let __reduce_keyed_fn = #f;
3431 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3432 if let Some((k, v)) = opt_payload {
3433 if let Some(curr_watermark) = *opt_curr_watermark {
3434 if k <= curr_watermark {
3435 return;
3436 }
3437 }
3438 match map.entry(k) {
3439 ::std::collections::hash_map::Entry::Vacant(e) => {
3440 e.insert(v);
3441 }
3442 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3443 __reduce_keyed_fn(e.get_mut(), v);
3444 }
3445 }
3446 } else {
3447 let watermark = opt_watermark.unwrap();
3448 if let Some(curr_watermark) = *opt_curr_watermark {
3449 if watermark <= curr_watermark {
3450 return;
3451 }
3452 }
3453 *opt_curr_watermark = opt_watermark;
3454 map.retain(|k, _| *k > watermark);
3455 }
3456 }
3457 })
3458 -> flat_map(|(map, _curr_watermark)| map);
3459 },
3460 None,
3461 Some(&next_stmt_id.to_string()),
3462 );
3463 }
3464 }
3465 BuildersOrCallback::Callback(_, node_callback) => {
3466 node_callback(node, next_stmt_id);
3467 }
3468 }
3469
3470 *next_stmt_id += 1;
3471
3472 ident_stack.push(fold_ident);
3473 }
3474
3475 HydroNode::Network {
3476 networking_info,
3477 serialize_fn: serialize_pipeline,
3478 instantiate_fn,
3479 deserialize_fn: deserialize_pipeline,
3480 input,
3481 ..
3482 } => {
3483 let input_ident = ident_stack.pop().unwrap();
3484
3485 let receiver_stream_ident =
3486 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3487
3488 match builders_or_callback {
3489 BuildersOrCallback::Builders(graph_builders) => {
3490 let (sink_expr, source_expr) = match instantiate_fn {
3491 DebugInstantiate::Building => (
3492 syn::parse_quote!(DUMMY_SINK),
3493 syn::parse_quote!(DUMMY_SOURCE),
3494 ),
3495
3496 DebugInstantiate::Finalized(finalized) => {
3497 (finalized.sink.clone(), finalized.source.clone())
3498 }
3499 };
3500
3501 graph_builders.create_network(
3502 &input.metadata().location_id,
3503 &out_location,
3504 input_ident,
3505 &receiver_stream_ident,
3506 serialize_pipeline.as_ref(),
3507 sink_expr,
3508 source_expr,
3509 deserialize_pipeline.as_ref(),
3510 *next_stmt_id,
3511 networking_info,
3512 );
3513 }
3514 BuildersOrCallback::Callback(_, node_callback) => {
3515 node_callback(node, next_stmt_id);
3516 }
3517 }
3518
3519 *next_stmt_id += 1;
3520
3521 ident_stack.push(receiver_stream_ident);
3522 }
3523
3524 HydroNode::ExternalInput {
3525 instantiate_fn,
3526 deserialize_fn: deserialize_pipeline,
3527 ..
3528 } => {
3529 let receiver_stream_ident =
3530 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3531
3532 match builders_or_callback {
3533 BuildersOrCallback::Builders(graph_builders) => {
3534 let (_, source_expr) = match instantiate_fn {
3535 DebugInstantiate::Building => (
3536 syn::parse_quote!(DUMMY_SINK),
3537 syn::parse_quote!(DUMMY_SOURCE),
3538 ),
3539
3540 DebugInstantiate::Finalized(finalized) => {
3541 (finalized.sink.clone(), finalized.source.clone())
3542 }
3543 };
3544
3545 graph_builders.create_external_source(
3546 &out_location,
3547 source_expr,
3548 &receiver_stream_ident,
3549 deserialize_pipeline.as_ref(),
3550 *next_stmt_id,
3551 );
3552 }
3553 BuildersOrCallback::Callback(_, node_callback) => {
3554 node_callback(node, next_stmt_id);
3555 }
3556 }
3557
3558 *next_stmt_id += 1;
3559
3560 ident_stack.push(receiver_stream_ident);
3561 }
3562
3563 HydroNode::Counter {
3564 tag,
3565 duration,
3566 prefix,
3567 ..
3568 } => {
3569 let input_ident = ident_stack.pop().unwrap();
3570
3571 let counter_ident =
3572 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3573
3574 match builders_or_callback {
3575 BuildersOrCallback::Builders(graph_builders) => {
3576 let arg = format!("{}({})", prefix, tag);
3577 let builder = graph_builders.get_dfir_mut(&out_location);
3578 builder.add_dfir(
3579 parse_quote! {
3580 #counter_ident = #input_ident -> _counter(#arg, #duration);
3581 },
3582 None,
3583 Some(&next_stmt_id.to_string()),
3584 );
3585 }
3586 BuildersOrCallback::Callback(_, node_callback) => {
3587 node_callback(node, next_stmt_id);
3588 }
3589 }
3590
3591 *next_stmt_id += 1;
3592
3593 ident_stack.push(counter_ident);
3594 }
3595 }
3596 },
3597 seen_tees,
3598 false,
3599 );
3600
3601 ident_stack
3602 .pop()
3603 .expect("ident_stack should have exactly one element after traversal")
3604 }
3605
3606 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3607 match self {
3608 HydroNode::Placeholder => {
3609 panic!()
3610 }
3611 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3612 HydroNode::Source { source, .. } => match source {
3613 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3614 HydroSource::ExternalNetwork()
3615 | HydroSource::Spin()
3616 | HydroSource::ClusterMembers(_, _)
3617 | HydroSource::Embedded(_) => {} },
3619 HydroNode::SingletonSource { value, .. } => {
3620 transform(value);
3621 }
3622 HydroNode::CycleSource { .. }
3623 | HydroNode::Tee { .. }
3624 | HydroNode::YieldConcat { .. }
3625 | HydroNode::BeginAtomic { .. }
3626 | HydroNode::EndAtomic { .. }
3627 | HydroNode::Batch { .. }
3628 | HydroNode::Chain { .. }
3629 | HydroNode::ChainFirst { .. }
3630 | HydroNode::CrossProduct { .. }
3631 | HydroNode::CrossSingleton { .. }
3632 | HydroNode::ResolveFutures { .. }
3633 | HydroNode::ResolveFuturesOrdered { .. }
3634 | HydroNode::Join { .. }
3635 | HydroNode::Difference { .. }
3636 | HydroNode::AntiJoin { .. }
3637 | HydroNode::DeferTick { .. }
3638 | HydroNode::Enumerate { .. }
3639 | HydroNode::Unique { .. }
3640 | HydroNode::Sort { .. } => {}
3641 HydroNode::Map { f, .. }
3642 | HydroNode::FlatMap { f, .. }
3643 | HydroNode::Filter { f, .. }
3644 | HydroNode::FilterMap { f, .. }
3645 | HydroNode::Inspect { f, .. }
3646 | HydroNode::Reduce { f, .. }
3647 | HydroNode::ReduceKeyed { f, .. }
3648 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3649 transform(f);
3650 }
3651 HydroNode::Fold { init, acc, .. }
3652 | HydroNode::Scan { init, acc, .. }
3653 | HydroNode::FoldKeyed { init, acc, .. } => {
3654 transform(init);
3655 transform(acc);
3656 }
3657 HydroNode::Network {
3658 serialize_fn,
3659 deserialize_fn,
3660 ..
3661 } => {
3662 if let Some(serialize_fn) = serialize_fn {
3663 transform(serialize_fn);
3664 }
3665 if let Some(deserialize_fn) = deserialize_fn {
3666 transform(deserialize_fn);
3667 }
3668 }
3669 HydroNode::ExternalInput { deserialize_fn, .. } => {
3670 if let Some(deserialize_fn) = deserialize_fn {
3671 transform(deserialize_fn);
3672 }
3673 }
3674 HydroNode::Counter { duration, .. } => {
3675 transform(duration);
3676 }
3677 }
3678 }
3679
3680 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3681 &self.metadata().op
3682 }
3683
3684 pub fn metadata(&self) -> &HydroIrMetadata {
3685 match self {
3686 HydroNode::Placeholder => {
3687 panic!()
3688 }
3689 HydroNode::Cast { metadata, .. } => metadata,
3690 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3691 HydroNode::Source { metadata, .. } => metadata,
3692 HydroNode::SingletonSource { metadata, .. } => metadata,
3693 HydroNode::CycleSource { metadata, .. } => metadata,
3694 HydroNode::Tee { metadata, .. } => metadata,
3695 HydroNode::YieldConcat { metadata, .. } => metadata,
3696 HydroNode::BeginAtomic { metadata, .. } => metadata,
3697 HydroNode::EndAtomic { metadata, .. } => metadata,
3698 HydroNode::Batch { metadata, .. } => metadata,
3699 HydroNode::Chain { metadata, .. } => metadata,
3700 HydroNode::ChainFirst { metadata, .. } => metadata,
3701 HydroNode::CrossProduct { metadata, .. } => metadata,
3702 HydroNode::CrossSingleton { metadata, .. } => metadata,
3703 HydroNode::Join { metadata, .. } => metadata,
3704 HydroNode::Difference { metadata, .. } => metadata,
3705 HydroNode::AntiJoin { metadata, .. } => metadata,
3706 HydroNode::ResolveFutures { metadata, .. } => metadata,
3707 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3708 HydroNode::Map { metadata, .. } => metadata,
3709 HydroNode::FlatMap { metadata, .. } => metadata,
3710 HydroNode::Filter { metadata, .. } => metadata,
3711 HydroNode::FilterMap { metadata, .. } => metadata,
3712 HydroNode::DeferTick { metadata, .. } => metadata,
3713 HydroNode::Enumerate { metadata, .. } => metadata,
3714 HydroNode::Inspect { metadata, .. } => metadata,
3715 HydroNode::Unique { metadata, .. } => metadata,
3716 HydroNode::Sort { metadata, .. } => metadata,
3717 HydroNode::Scan { metadata, .. } => metadata,
3718 HydroNode::Fold { metadata, .. } => metadata,
3719 HydroNode::FoldKeyed { metadata, .. } => metadata,
3720 HydroNode::Reduce { metadata, .. } => metadata,
3721 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3722 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3723 HydroNode::ExternalInput { metadata, .. } => metadata,
3724 HydroNode::Network { metadata, .. } => metadata,
3725 HydroNode::Counter { metadata, .. } => metadata,
3726 }
3727 }
3728
3729 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3730 &mut self.metadata_mut().op
3731 }
3732
3733 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3734 match self {
3735 HydroNode::Placeholder => {
3736 panic!()
3737 }
3738 HydroNode::Cast { metadata, .. } => metadata,
3739 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3740 HydroNode::Source { metadata, .. } => metadata,
3741 HydroNode::SingletonSource { metadata, .. } => metadata,
3742 HydroNode::CycleSource { metadata, .. } => metadata,
3743 HydroNode::Tee { metadata, .. } => metadata,
3744 HydroNode::YieldConcat { metadata, .. } => metadata,
3745 HydroNode::BeginAtomic { metadata, .. } => metadata,
3746 HydroNode::EndAtomic { metadata, .. } => metadata,
3747 HydroNode::Batch { metadata, .. } => metadata,
3748 HydroNode::Chain { metadata, .. } => metadata,
3749 HydroNode::ChainFirst { metadata, .. } => metadata,
3750 HydroNode::CrossProduct { metadata, .. } => metadata,
3751 HydroNode::CrossSingleton { metadata, .. } => metadata,
3752 HydroNode::Join { metadata, .. } => metadata,
3753 HydroNode::Difference { metadata, .. } => metadata,
3754 HydroNode::AntiJoin { metadata, .. } => metadata,
3755 HydroNode::ResolveFutures { metadata, .. } => metadata,
3756 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3757 HydroNode::Map { metadata, .. } => metadata,
3758 HydroNode::FlatMap { metadata, .. } => metadata,
3759 HydroNode::Filter { metadata, .. } => metadata,
3760 HydroNode::FilterMap { metadata, .. } => metadata,
3761 HydroNode::DeferTick { metadata, .. } => metadata,
3762 HydroNode::Enumerate { metadata, .. } => metadata,
3763 HydroNode::Inspect { metadata, .. } => metadata,
3764 HydroNode::Unique { metadata, .. } => metadata,
3765 HydroNode::Sort { metadata, .. } => metadata,
3766 HydroNode::Scan { metadata, .. } => metadata,
3767 HydroNode::Fold { metadata, .. } => metadata,
3768 HydroNode::FoldKeyed { metadata, .. } => metadata,
3769 HydroNode::Reduce { metadata, .. } => metadata,
3770 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3771 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3772 HydroNode::ExternalInput { metadata, .. } => metadata,
3773 HydroNode::Network { metadata, .. } => metadata,
3774 HydroNode::Counter { metadata, .. } => metadata,
3775 }
3776 }
3777
3778 pub fn input(&self) -> Vec<&HydroNode> {
3779 match self {
3780 HydroNode::Placeholder => {
3781 panic!()
3782 }
3783 HydroNode::Source { .. }
3784 | HydroNode::SingletonSource { .. }
3785 | HydroNode::ExternalInput { .. }
3786 | HydroNode::CycleSource { .. }
3787 | HydroNode::Tee { .. } => {
3788 vec![]
3790 }
3791 HydroNode::Cast { inner, .. }
3792 | HydroNode::ObserveNonDet { inner, .. }
3793 | HydroNode::YieldConcat { inner, .. }
3794 | HydroNode::BeginAtomic { inner, .. }
3795 | HydroNode::EndAtomic { inner, .. }
3796 | HydroNode::Batch { inner, .. } => {
3797 vec![inner]
3798 }
3799 HydroNode::Chain { first, second, .. } => {
3800 vec![first, second]
3801 }
3802 HydroNode::ChainFirst { first, second, .. } => {
3803 vec![first, second]
3804 }
3805 HydroNode::CrossProduct { left, right, .. }
3806 | HydroNode::CrossSingleton { left, right, .. }
3807 | HydroNode::Join { left, right, .. } => {
3808 vec![left, right]
3809 }
3810 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3811 vec![pos, neg]
3812 }
3813 HydroNode::Map { input, .. }
3814 | HydroNode::FlatMap { input, .. }
3815 | HydroNode::Filter { input, .. }
3816 | HydroNode::FilterMap { input, .. }
3817 | HydroNode::Sort { input, .. }
3818 | HydroNode::DeferTick { input, .. }
3819 | HydroNode::Enumerate { input, .. }
3820 | HydroNode::Inspect { input, .. }
3821 | HydroNode::Unique { input, .. }
3822 | HydroNode::Network { input, .. }
3823 | HydroNode::Counter { input, .. }
3824 | HydroNode::ResolveFutures { input, .. }
3825 | HydroNode::ResolveFuturesOrdered { input, .. }
3826 | HydroNode::Fold { input, .. }
3827 | HydroNode::FoldKeyed { input, .. }
3828 | HydroNode::Reduce { input, .. }
3829 | HydroNode::ReduceKeyed { input, .. }
3830 | HydroNode::Scan { input, .. } => {
3831 vec![input]
3832 }
3833 HydroNode::ReduceKeyedWatermark {
3834 input, watermark, ..
3835 } => {
3836 vec![input, watermark]
3837 }
3838 }
3839 }
3840
3841 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3842 self.input()
3843 .iter()
3844 .map(|input_node| input_node.metadata())
3845 .collect()
3846 }
3847
3848 pub fn print_root(&self) -> String {
3849 match self {
3850 HydroNode::Placeholder => {
3851 panic!()
3852 }
3853 HydroNode::Cast { .. } => "Cast()".to_owned(),
3854 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3855 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3856 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3857 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
3858 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3859 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3860 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3861 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3862 HydroNode::Batch { .. } => "Batch()".to_owned(),
3863 HydroNode::Chain { first, second, .. } => {
3864 format!("Chain({}, {})", first.print_root(), second.print_root())
3865 }
3866 HydroNode::ChainFirst { first, second, .. } => {
3867 format!(
3868 "ChainFirst({}, {})",
3869 first.print_root(),
3870 second.print_root()
3871 )
3872 }
3873 HydroNode::CrossProduct { left, right, .. } => {
3874 format!(
3875 "CrossProduct({}, {})",
3876 left.print_root(),
3877 right.print_root()
3878 )
3879 }
3880 HydroNode::CrossSingleton { left, right, .. } => {
3881 format!(
3882 "CrossSingleton({}, {})",
3883 left.print_root(),
3884 right.print_root()
3885 )
3886 }
3887 HydroNode::Join { left, right, .. } => {
3888 format!("Join({}, {})", left.print_root(), right.print_root())
3889 }
3890 HydroNode::Difference { pos, neg, .. } => {
3891 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3892 }
3893 HydroNode::AntiJoin { pos, neg, .. } => {
3894 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3895 }
3896 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3897 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3898 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3899 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3900 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3901 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3902 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3903 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3904 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3905 HydroNode::Unique { .. } => "Unique()".to_owned(),
3906 HydroNode::Sort { .. } => "Sort()".to_owned(),
3907 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3908 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3909 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3910 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3911 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3912 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3913 HydroNode::Network { .. } => "Network()".to_owned(),
3914 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3915 HydroNode::Counter { tag, duration, .. } => {
3916 format!("Counter({:?}, {:?})", tag, duration)
3917 }
3918 }
3919 }
3920}
3921
3922#[cfg(feature = "build")]
3923fn instantiate_network<'a, D>(
3924 env: &mut D::InstantiateEnv,
3925 from_location: &LocationId,
3926 to_location: &LocationId,
3927 processes: &SparseSecondaryMap<LocationKey, D::Process>,
3928 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3929 name: Option<&str>,
3930 networking_info: &crate::networking::NetworkingInfo,
3931) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3932where
3933 D: Deploy<'a>,
3934{
3935 let ((sink, source), connect_fn) = match (from_location, to_location) {
3936 (&LocationId::Process(from), &LocationId::Process(to)) => {
3937 let from_node = processes
3938 .get(from)
3939 .unwrap_or_else(|| {
3940 panic!("A process used in the graph was not instantiated: {}", from)
3941 })
3942 .clone();
3943 let to_node = processes
3944 .get(to)
3945 .unwrap_or_else(|| {
3946 panic!("A process used in the graph was not instantiated: {}", to)
3947 })
3948 .clone();
3949
3950 let sink_port = from_node.next_port();
3951 let source_port = to_node.next_port();
3952
3953 (
3954 D::o2o_sink_source(
3955 env,
3956 &from_node,
3957 &sink_port,
3958 &to_node,
3959 &source_port,
3960 name,
3961 networking_info,
3962 ),
3963 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3964 )
3965 }
3966 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3967 let from_node = processes
3968 .get(from)
3969 .unwrap_or_else(|| {
3970 panic!("A process used in the graph was not instantiated: {}", from)
3971 })
3972 .clone();
3973 let to_node = clusters
3974 .get(to)
3975 .unwrap_or_else(|| {
3976 panic!("A cluster used in the graph was not instantiated: {}", to)
3977 })
3978 .clone();
3979
3980 let sink_port = from_node.next_port();
3981 let source_port = to_node.next_port();
3982
3983 (
3984 D::o2m_sink_source(
3985 env,
3986 &from_node,
3987 &sink_port,
3988 &to_node,
3989 &source_port,
3990 name,
3991 networking_info,
3992 ),
3993 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3994 )
3995 }
3996 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3997 let from_node = clusters
3998 .get(from)
3999 .unwrap_or_else(|| {
4000 panic!("A cluster used in the graph was not instantiated: {}", from)
4001 })
4002 .clone();
4003 let to_node = processes
4004 .get(to)
4005 .unwrap_or_else(|| {
4006 panic!("A process used in the graph was not instantiated: {}", to)
4007 })
4008 .clone();
4009
4010 let sink_port = from_node.next_port();
4011 let source_port = to_node.next_port();
4012
4013 (
4014 D::m2o_sink_source(
4015 env,
4016 &from_node,
4017 &sink_port,
4018 &to_node,
4019 &source_port,
4020 name,
4021 networking_info,
4022 ),
4023 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
4024 )
4025 }
4026 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
4027 let from_node = clusters
4028 .get(from)
4029 .unwrap_or_else(|| {
4030 panic!("A cluster used in the graph was not instantiated: {}", from)
4031 })
4032 .clone();
4033 let to_node = clusters
4034 .get(to)
4035 .unwrap_or_else(|| {
4036 panic!("A cluster used in the graph was not instantiated: {}", to)
4037 })
4038 .clone();
4039
4040 let sink_port = from_node.next_port();
4041 let source_port = to_node.next_port();
4042
4043 (
4044 D::m2m_sink_source(
4045 env,
4046 &from_node,
4047 &sink_port,
4048 &to_node,
4049 &source_port,
4050 name,
4051 networking_info,
4052 ),
4053 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
4054 )
4055 }
4056 (LocationId::Tick(_, _), _) => panic!(),
4057 (_, LocationId::Tick(_, _)) => panic!(),
4058 (LocationId::Atomic(_), _) => panic!(),
4059 (_, LocationId::Atomic(_)) => panic!(),
4060 };
4061 (sink, source, connect_fn)
4062}
4063
4064#[cfg(test)]
4065mod test {
4066 use std::mem::size_of;
4067
4068 use stageleft::{QuotedWithContext, q};
4069
4070 use super::*;
4071
4072 #[test]
4073 #[cfg_attr(
4074 not(feature = "build"),
4075 ignore = "expects inclusion of feature-gated fields"
4076 )]
4077 fn hydro_node_size() {
4078 assert_eq!(size_of::<HydroNode>(), 248);
4079 }
4080
4081 #[test]
4082 #[cfg_attr(
4083 not(feature = "build"),
4084 ignore = "expects inclusion of feature-gated fields"
4085 )]
4086 fn hydro_root_size() {
4087 assert_eq!(size_of::<HydroRoot>(), 136);
4088 }
4089
4090 #[test]
4091 fn test_simplify_q_macro_basic() {
4092 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
4094 let result = simplify_q_macro(simple_expr.clone());
4095 assert_eq!(result, simple_expr);
4096 }
4097
4098 #[test]
4099 fn test_simplify_q_macro_actual_stageleft_call() {
4100 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
4102 let result = simplify_q_macro(stageleft_call);
4103 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4106 }
4107
4108 #[test]
4109 fn test_closure_no_pipe_at_start() {
4110 let stageleft_call = q!({
4112 let foo = 123;
4113 move |b: usize| b + foo
4114 })
4115 .splice_fn1_ctx(&());
4116 let result = simplify_q_macro(stageleft_call);
4117 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
4118 }
4119}