1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21
22#[cfg(feature = "build")]
23use crate::deploy::{Deploy, RegisterPort};
24use crate::location::LocationId;
25
26#[derive(Clone, Hash)]
30pub struct DebugExpr(pub Box<syn::Expr>);
31
32impl From<syn::Expr> for DebugExpr {
33 fn from(expr: syn::Expr) -> Self {
34 Self(Box::new(expr))
35 }
36}
37
38impl Deref for DebugExpr {
39 type Target = syn::Expr;
40
41 fn deref(&self) -> &Self::Target {
42 &self.0
43 }
44}
45
46impl ToTokens for DebugExpr {
47 fn to_tokens(&self, tokens: &mut TokenStream) {
48 self.0.to_tokens(tokens);
49 }
50}
51
52impl Debug for DebugExpr {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 write!(f, "{}", self.0.to_token_stream())
55 }
56}
57
58#[derive(Clone, Hash)]
62pub struct DebugType(pub Box<syn::Type>);
63
64impl From<syn::Type> for DebugType {
65 fn from(t: syn::Type) -> Self {
66 Self(Box::new(t))
67 }
68}
69
70impl Deref for DebugType {
71 type Target = syn::Type;
72
73 fn deref(&self) -> &Self::Target {
74 &self.0
75 }
76}
77
78impl ToTokens for DebugType {
79 fn to_tokens(&self, tokens: &mut TokenStream) {
80 self.0.to_tokens(tokens);
81 }
82}
83
84impl Debug for DebugType {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 write!(f, "{}", self.0.to_token_stream())
87 }
88}
89
90pub enum DebugInstantiate {
91 Building,
92 Finalized(Box<DebugInstantiateFinalized>),
93}
94
95#[cfg_attr(
96 not(feature = "build"),
97 expect(
98 dead_code,
99 reason = "sink, source unused without `feature = \"build\"`."
100 )
101)]
102pub struct DebugInstantiateFinalized {
103 sink: syn::Expr,
104 source: syn::Expr,
105 connect_fn: Option<Box<dyn FnOnce()>>,
106}
107
108impl From<DebugInstantiateFinalized> for DebugInstantiate {
109 fn from(f: DebugInstantiateFinalized) -> Self {
110 Self::Finalized(Box::new(f))
111 }
112}
113
114impl Debug for DebugInstantiate {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 write!(f, "<network instantiate>")
117 }
118}
119
120impl Hash for DebugInstantiate {
121 fn hash<H: Hasher>(&self, _state: &mut H) {
122 }
124}
125
126impl Clone for DebugInstantiate {
127 fn clone(&self) -> Self {
128 match self {
129 DebugInstantiate::Building => DebugInstantiate::Building,
130 DebugInstantiate::Finalized(_) => {
131 panic!("DebugInstantiate::Finalized should not be cloned")
132 }
133 }
134 }
135}
136
137#[derive(Debug, Hash, Clone)]
139pub enum HydroSource {
140 Stream(DebugExpr),
141 ExternalNetwork(),
142 Iter(DebugExpr),
143 Spin(),
144}
145
146#[cfg(feature = "build")]
147pub enum BuildersOrCallback<'a, L, N>
148where
149 L: FnMut(&mut HydroLeaf, &mut usize),
150 N: FnMut(&mut HydroNode, &mut usize),
151{
152 Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
153 Callback(L, N),
154}
155
156#[derive(Debug, Hash)]
160pub enum HydroLeaf {
161 ForEach {
162 f: DebugExpr,
163 input: Box<HydroNode>,
164 metadata: HydroIrMetadata,
165 },
166 DestSink {
167 sink: DebugExpr,
168 input: Box<HydroNode>,
169 metadata: HydroIrMetadata,
170 },
171 CycleSink {
172 ident: syn::Ident,
173 location_kind: LocationId,
174 input: Box<HydroNode>,
175 metadata: HydroIrMetadata,
176 },
177}
178
179impl HydroLeaf {
180 #[cfg(feature = "build")]
181 pub fn compile_network<'a, D>(
182 &mut self,
183 compile_env: &D::CompileEnv,
184 seen_tees: &mut SeenTees,
185 seen_tee_locations: &mut SeenTeeLocations,
186 processes: &HashMap<usize, D::Process>,
187 clusters: &HashMap<usize, D::Cluster>,
188 externals: &HashMap<usize, D::ExternalProcess>,
189 ) where
190 D: Deploy<'a>,
191 {
192 self.transform_children(
193 |n, s| {
194 n.compile_network::<D>(
195 compile_env,
196 s,
197 seen_tee_locations,
198 processes,
199 clusters,
200 externals,
201 );
202 },
203 seen_tees,
204 )
205 }
206
207 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
208 self.transform_children(
209 |n, s| {
210 n.connect_network(s);
211 },
212 seen_tees,
213 )
214 }
215
216 pub fn transform_bottom_up(
217 &mut self,
218 transform_leaf: &mut impl FnMut(&mut HydroLeaf),
219 transform_node: &mut impl FnMut(&mut HydroNode),
220 seen_tees: &mut SeenTees,
221 ) {
222 self.transform_children(|n, s| n.transform_bottom_up(transform_node, s), seen_tees);
223
224 transform_leaf(self);
225 }
226
227 pub fn transform_children(
228 &mut self,
229 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
230 seen_tees: &mut SeenTees,
231 ) {
232 match self {
233 HydroLeaf::ForEach { f: _, input, .. }
234 | HydroLeaf::DestSink { sink: _, input, .. }
235 | HydroLeaf::CycleSink {
236 ident: _,
237 location_kind: _,
238 input,
239 ..
240 } => {
241 transform(input, seen_tees);
242 }
243 }
244 }
245
246 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroLeaf {
247 match self {
248 HydroLeaf::ForEach { f, input, metadata } => HydroLeaf::ForEach {
249 f: f.clone(),
250 input: Box::new(input.deep_clone(seen_tees)),
251 metadata: metadata.clone(),
252 },
253 HydroLeaf::DestSink {
254 sink,
255 input,
256 metadata,
257 } => HydroLeaf::DestSink {
258 sink: sink.clone(),
259 input: Box::new(input.deep_clone(seen_tees)),
260 metadata: metadata.clone(),
261 },
262 HydroLeaf::CycleSink {
263 ident,
264 location_kind,
265 input,
266 metadata,
267 } => HydroLeaf::CycleSink {
268 ident: ident.clone(),
269 location_kind: location_kind.clone(),
270 input: Box::new(input.deep_clone(seen_tees)),
271 metadata: metadata.clone(),
272 },
273 }
274 }
275
276 #[cfg(feature = "build")]
277 pub fn emit(
278 &mut self,
279 graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
280 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
281 next_stmt_id: &mut usize,
282 ) {
283 self.emit_core(
284 &mut BuildersOrCallback::Builders::<
285 fn(&mut HydroLeaf, &mut usize),
286 fn(&mut HydroNode, &mut usize),
287 >(graph_builders),
288 built_tees,
289 next_stmt_id,
290 );
291 }
292
293 #[cfg(feature = "build")]
294 pub fn emit_core(
295 &mut self,
296 builders_or_callback: &mut BuildersOrCallback<
297 impl FnMut(&mut HydroLeaf, &mut usize),
298 impl FnMut(&mut HydroNode, &mut usize),
299 >,
300 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
301 next_stmt_id: &mut usize,
302 ) {
303 match self {
304 HydroLeaf::ForEach { f, input, .. } => {
305 let (input_ident, input_location_id) =
306 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
307
308 match builders_or_callback {
309 BuildersOrCallback::Builders(graph_builders) => {
310 graph_builders
311 .entry(input_location_id)
312 .or_default()
313 .add_dfir(
314 parse_quote! {
315 #input_ident -> for_each(#f);
316 },
317 None,
318 Some(&next_stmt_id.to_string()),
319 );
320 }
321 BuildersOrCallback::Callback(leaf_callback, _) => {
322 leaf_callback(self, next_stmt_id);
323 }
324 }
325
326 *next_stmt_id += 1;
327 }
328
329 HydroLeaf::DestSink { sink, input, .. } => {
330 let (input_ident, input_location_id) =
331 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
332
333 match builders_or_callback {
334 BuildersOrCallback::Builders(graph_builders) => {
335 graph_builders
336 .entry(input_location_id)
337 .or_default()
338 .add_dfir(
339 parse_quote! {
340 #input_ident -> dest_sink(#sink);
341 },
342 None,
343 Some(&next_stmt_id.to_string()),
344 );
345 }
346 BuildersOrCallback::Callback(leaf_callback, _) => {
347 leaf_callback(self, next_stmt_id);
348 }
349 }
350
351 *next_stmt_id += 1;
352 }
353
354 HydroLeaf::CycleSink {
355 ident,
356 location_kind,
357 input,
358 ..
359 } => {
360 let (input_ident, input_location_id) =
361 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
362
363 let location_id = match location_kind.root() {
364 LocationId::Process(id) => id,
365 LocationId::Cluster(id) => id,
366 LocationId::Tick(_, _) => panic!(),
367 LocationId::ExternalProcess(_) => panic!(),
368 };
369
370 assert_eq!(
371 input_location_id, *location_id,
372 "cycle_sink location mismatch"
373 );
374
375 match builders_or_callback {
376 BuildersOrCallback::Builders(graph_builders) => {
377 graph_builders.entry(*location_id).or_default().add_dfir(
378 parse_quote! {
379 #ident = #input_ident;
380 },
381 None,
382 None,
383 );
384 }
385 BuildersOrCallback::Callback(_, _) => {}
387 }
388 }
389 }
390 }
391
392 pub fn metadata(&self) -> &HydroIrMetadata {
393 match self {
394 HydroLeaf::ForEach { metadata, .. }
395 | HydroLeaf::DestSink { metadata, .. }
396 | HydroLeaf::CycleSink { metadata, .. } => metadata,
397 }
398 }
399
400 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
401 match self {
402 HydroLeaf::ForEach { metadata, .. }
403 | HydroLeaf::DestSink { metadata, .. }
404 | HydroLeaf::CycleSink { metadata, .. } => metadata,
405 }
406 }
407
408 pub fn print_root(&self) -> String {
409 match self {
410 HydroLeaf::ForEach { f, .. } => format!("ForEach({:?})", f),
411 HydroLeaf::DestSink { sink, .. } => format!("DestSink({:?})", sink),
412 HydroLeaf::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
413 }
414 }
415}
416
417#[cfg(feature = "build")]
418pub fn emit(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, FlatGraphBuilder> {
419 let mut builders = BTreeMap::new();
420 let mut built_tees = HashMap::new();
421 let mut next_stmt_id = 0;
422 for leaf in ir {
423 leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
424 }
425 builders
426}
427
428#[cfg(feature = "build")]
429pub fn traverse_dfir(
430 ir: &mut [HydroLeaf],
431 transform_leaf: impl FnMut(&mut HydroLeaf, &mut usize),
432 transform_node: impl FnMut(&mut HydroNode, &mut usize),
433) {
434 let mut seen_tees = HashMap::new();
435 let mut next_stmt_id = 0;
436 let mut callback = BuildersOrCallback::Callback(transform_leaf, transform_node);
437 ir.iter_mut().for_each(|leaf| {
438 leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
439 });
440}
441
442pub fn transform_bottom_up(
443 ir: &mut [HydroLeaf],
444 transform_leaf: &mut impl FnMut(&mut HydroLeaf),
445 transform_node: &mut impl FnMut(&mut HydroNode),
446) {
447 let mut seen_tees = HashMap::new();
448 ir.iter_mut().for_each(|leaf| {
449 leaf.transform_bottom_up(transform_leaf, transform_node, &mut seen_tees);
450 });
451}
452
453pub fn deep_clone(ir: &[HydroLeaf]) -> Vec<HydroLeaf> {
454 let mut seen_tees = HashMap::new();
455 ir.iter()
456 .map(|leaf| leaf.deep_clone(&mut seen_tees))
457 .collect()
458}
459
460type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
461thread_local! {
462 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
463}
464
465pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
466 PRINTED_TEES.with(|printed_tees| {
467 let mut printed_tees_mut = printed_tees.borrow_mut();
468 *printed_tees_mut = Some((0, HashMap::new()));
469 drop(printed_tees_mut);
470
471 let ret = f();
472
473 let mut printed_tees_mut = printed_tees.borrow_mut();
474 *printed_tees_mut = None;
475
476 ret
477 })
478}
479
480pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
481
482impl TeeNode {
483 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
484 Rc::as_ptr(&self.0)
485 }
486}
487
488impl Debug for TeeNode {
489 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
490 PRINTED_TEES.with(|printed_tees| {
491 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
492 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
493
494 if let Some(printed_tees_mut) = printed_tees_mut {
495 if let Some(existing) = printed_tees_mut
496 .1
497 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
498 {
499 write!(f, "<tee {}>", existing)
500 } else {
501 let next_id = printed_tees_mut.0;
502 printed_tees_mut.0 += 1;
503 printed_tees_mut
504 .1
505 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
506 drop(printed_tees_mut_borrow);
507 write!(f, "<tee {}>: ", next_id)?;
508 Debug::fmt(&self.0.borrow(), f)
509 }
510 } else {
511 drop(printed_tees_mut_borrow);
512 write!(f, "<tee>: ")?;
513 Debug::fmt(&self.0.borrow(), f)
514 }
515 })
516 }
517}
518
519impl Hash for TeeNode {
520 fn hash<H: Hasher>(&self, state: &mut H) {
521 self.0.borrow_mut().hash(state);
522 }
523}
524
525#[derive(Debug, Clone)]
526pub struct HydroIrMetadata {
527 pub location_kind: LocationId,
528 pub output_type: Option<DebugType>,
529 pub cardinality: Option<usize>,
530 pub cpu_usage: Option<f64>,
531}
532
533impl Hash for HydroIrMetadata {
535 fn hash<H: Hasher>(&self, _: &mut H) {}
536}
537
538impl PartialEq for HydroIrMetadata {
539 fn eq(&self, _: &Self) -> bool {
540 true
541 }
542}
543
544impl Eq for HydroIrMetadata {}
545
546#[derive(Debug, Hash)]
549pub enum HydroNode {
550 Placeholder,
551
552 Source {
553 source: HydroSource,
554 location_kind: LocationId,
555 metadata: HydroIrMetadata,
556 },
557
558 CycleSource {
559 ident: syn::Ident,
560 location_kind: LocationId,
561 metadata: HydroIrMetadata,
562 },
563
564 Tee {
565 inner: TeeNode,
566 metadata: HydroIrMetadata,
567 },
568
569 Persist {
570 inner: Box<HydroNode>,
571 metadata: HydroIrMetadata,
572 },
573
574 Unpersist {
575 inner: Box<HydroNode>,
576 metadata: HydroIrMetadata,
577 },
578
579 Delta {
580 inner: Box<HydroNode>,
581 metadata: HydroIrMetadata,
582 },
583
584 Chain {
585 first: Box<HydroNode>,
586 second: Box<HydroNode>,
587 metadata: HydroIrMetadata,
588 },
589
590 CrossProduct {
591 left: Box<HydroNode>,
592 right: Box<HydroNode>,
593 metadata: HydroIrMetadata,
594 },
595
596 CrossSingleton {
597 left: Box<HydroNode>,
598 right: Box<HydroNode>,
599 metadata: HydroIrMetadata,
600 },
601
602 Join {
603 left: Box<HydroNode>,
604 right: Box<HydroNode>,
605 metadata: HydroIrMetadata,
606 },
607
608 Difference {
609 pos: Box<HydroNode>,
610 neg: Box<HydroNode>,
611 metadata: HydroIrMetadata,
612 },
613
614 AntiJoin {
615 pos: Box<HydroNode>,
616 neg: Box<HydroNode>,
617 metadata: HydroIrMetadata,
618 },
619
620 ResolveFutures {
621 input: Box<HydroNode>,
622 metadata: HydroIrMetadata,
623 },
624 ResolveFuturesOrdered {
625 input: Box<HydroNode>,
626 metadata: HydroIrMetadata,
627 },
628
629 Map {
630 f: DebugExpr,
631 input: Box<HydroNode>,
632 metadata: HydroIrMetadata,
633 },
634 FlatMap {
635 f: DebugExpr,
636 input: Box<HydroNode>,
637 metadata: HydroIrMetadata,
638 },
639 Filter {
640 f: DebugExpr,
641 input: Box<HydroNode>,
642 metadata: HydroIrMetadata,
643 },
644 FilterMap {
645 f: DebugExpr,
646 input: Box<HydroNode>,
647 metadata: HydroIrMetadata,
648 },
649
650 DeferTick {
651 input: Box<HydroNode>,
652 metadata: HydroIrMetadata,
653 },
654 Enumerate {
655 is_static: bool,
656 input: Box<HydroNode>,
657 metadata: HydroIrMetadata,
658 },
659 Inspect {
660 f: DebugExpr,
661 input: Box<HydroNode>,
662 metadata: HydroIrMetadata,
663 },
664
665 Unique {
666 input: Box<HydroNode>,
667 metadata: HydroIrMetadata,
668 },
669
670 Sort {
671 input: Box<HydroNode>,
672 metadata: HydroIrMetadata,
673 },
674 Fold {
675 init: DebugExpr,
676 acc: DebugExpr,
677 input: Box<HydroNode>,
678 metadata: HydroIrMetadata,
679 },
680 FoldKeyed {
681 init: DebugExpr,
682 acc: DebugExpr,
683 input: Box<HydroNode>,
684 metadata: HydroIrMetadata,
685 },
686
687 Reduce {
688 f: DebugExpr,
689 input: Box<HydroNode>,
690 metadata: HydroIrMetadata,
691 },
692 ReduceKeyed {
693 f: DebugExpr,
694 input: Box<HydroNode>,
695 metadata: HydroIrMetadata,
696 },
697
698 Network {
699 from_key: Option<usize>,
700 to_location: LocationId,
701 to_key: Option<usize>,
702 serialize_fn: Option<DebugExpr>,
703 instantiate_fn: DebugInstantiate,
704 deserialize_fn: Option<DebugExpr>,
705 input: Box<HydroNode>,
706 metadata: HydroIrMetadata,
707 },
708
709 Counter {
710 tag: String,
711 duration: DebugExpr,
712 input: Box<HydroNode>,
713 metadata: HydroIrMetadata,
714 },
715}
716
717pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
718pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
719
720impl HydroNode {
721 #[cfg(feature = "build")]
722 pub fn compile_network<'a, D>(
723 &mut self,
724 compile_env: &D::CompileEnv,
725 seen_tees: &mut SeenTees,
726 seen_tee_locations: &mut SeenTeeLocations,
727 nodes: &HashMap<usize, D::Process>,
728 clusters: &HashMap<usize, D::Cluster>,
729 externals: &HashMap<usize, D::ExternalProcess>,
730 ) where
731 D: Deploy<'a>,
732 {
733 let mut curr_location = None;
734
735 self.transform_bottom_up(
736 &mut |n| {
737 if let HydroNode::Network {
738 from_key,
739 to_location,
740 to_key,
741 instantiate_fn,
742 ..
743 } = n
744 {
745 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
746 DebugInstantiate::Building => instantiate_network::<D>(
747 curr_location.as_ref().unwrap(),
748 *from_key,
749 to_location,
750 *to_key,
751 nodes,
752 clusters,
753 externals,
754 compile_env,
755 ),
756
757 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
758 };
759
760 *instantiate_fn = DebugInstantiateFinalized {
761 sink: sink_expr,
762 source: source_expr,
763 connect_fn: Some(connect_fn),
764 }
765 .into();
766 }
767
768 match n {
770 HydroNode::Network {
771 to_location: location_kind,
772 ..
773 }
774 | HydroNode::CycleSource { location_kind, .. }
775 | HydroNode::Source { location_kind, .. } => {
776 if let LocationId::Tick(_, tick_loc) = location_kind {
778 curr_location = Some(*tick_loc.clone());
779 } else {
780 curr_location = Some(location_kind.clone());
781 }
782 }
783 HydroNode::Tee { inner, .. } => {
784 if let Some(tee_location) = seen_tee_locations.get(&inner.as_ptr()) {
785 curr_location = Some(tee_location.clone());
786 } else {
787 seen_tee_locations
788 .insert(inner.as_ptr(), curr_location.as_ref().unwrap().clone());
789 }
790 }
791 _ => {}
792 }
793 },
794 seen_tees,
795 );
796 }
797
798 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
799 self.transform_bottom_up(
800 &mut |n| {
801 if let HydroNode::Network { instantiate_fn, .. } = n {
802 match instantiate_fn {
803 DebugInstantiate::Building => panic!("network not built"),
804
805 DebugInstantiate::Finalized(finalized) => {
806 (finalized.connect_fn.take().unwrap())();
807 }
808 }
809 }
810 },
811 seen_tees,
812 );
813 }
814
815 pub fn transform_bottom_up(
816 &mut self,
817 transform: &mut impl FnMut(&mut HydroNode),
818 seen_tees: &mut SeenTees,
819 ) {
820 self.transform_children(|n, s| n.transform_bottom_up(transform, s), seen_tees);
821
822 transform(self);
823 }
824
825 #[inline(always)]
826 pub fn transform_children(
827 &mut self,
828 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
829 seen_tees: &mut SeenTees,
830 ) {
831 match self {
832 HydroNode::Placeholder => {
833 panic!();
834 }
835
836 HydroNode::Source { .. } | HydroNode::CycleSource { .. } => {}
837
838 HydroNode::Tee { inner, .. } => {
839 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
840 *inner = TeeNode(transformed.clone());
841 } else {
842 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
843 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
844 let mut orig = inner.0.replace(HydroNode::Placeholder);
845 transform(&mut orig, seen_tees);
846 *transformed_cell.borrow_mut() = orig;
847 *inner = TeeNode(transformed_cell);
848 }
849 }
850
851 HydroNode::Persist { inner, .. }
852 | HydroNode::Unpersist { inner, .. }
853 | HydroNode::Delta { inner, .. } => {
854 transform(inner.as_mut(), seen_tees);
855 }
856
857 HydroNode::Chain { first, second, .. } => {
858 transform(first.as_mut(), seen_tees);
859 transform(second.as_mut(), seen_tees);
860 }
861
862 HydroNode::CrossSingleton { left, right, .. }
863 | HydroNode::CrossProduct { left, right, .. }
864 | HydroNode::Join { left, right, .. } => {
865 transform(left.as_mut(), seen_tees);
866 transform(right.as_mut(), seen_tees);
867 }
868
869 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
870 transform(pos.as_mut(), seen_tees);
871 transform(neg.as_mut(), seen_tees);
872 }
873
874 HydroNode::Map { input, .. }
875 | HydroNode::ResolveFutures { input, .. }
876 | HydroNode::ResolveFuturesOrdered { input, .. }
877 | HydroNode::FlatMap { input, .. }
878 | HydroNode::Filter { input, .. }
879 | HydroNode::FilterMap { input, .. }
880 | HydroNode::Sort { input, .. }
881 | HydroNode::DeferTick { input, .. }
882 | HydroNode::Enumerate { input, .. }
883 | HydroNode::Inspect { input, .. }
884 | HydroNode::Unique { input, .. }
885 | HydroNode::Network { input, .. }
886 | HydroNode::Fold { input, .. }
887 | HydroNode::FoldKeyed { input, .. }
888 | HydroNode::Reduce { input, .. }
889 | HydroNode::ReduceKeyed { input, .. }
890 | HydroNode::Counter { input, .. } => {
891 transform(input.as_mut(), seen_tees);
892 }
893 }
894 }
895
896 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
897 match self {
898 HydroNode::Placeholder => HydroNode::Placeholder,
899 HydroNode::Source {
900 source,
901 location_kind,
902 metadata,
903 } => HydroNode::Source {
904 source: source.clone(),
905 location_kind: location_kind.clone(),
906 metadata: metadata.clone(),
907 },
908 HydroNode::CycleSource {
909 ident,
910 location_kind,
911 metadata,
912 } => HydroNode::CycleSource {
913 ident: ident.clone(),
914 location_kind: location_kind.clone(),
915 metadata: metadata.clone(),
916 },
917 HydroNode::Tee { inner, metadata } => {
918 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
919 HydroNode::Tee {
920 inner: TeeNode(transformed.clone()),
921 metadata: metadata.clone(),
922 }
923 } else {
924 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
925 seen_tees.insert(inner.as_ptr(), new_rc.clone());
926 let cloned = inner.0.borrow().deep_clone(seen_tees);
927 *new_rc.borrow_mut() = cloned;
928 HydroNode::Tee {
929 inner: TeeNode(new_rc),
930 metadata: metadata.clone(),
931 }
932 }
933 }
934 HydroNode::Persist { inner, metadata } => HydroNode::Persist {
935 inner: Box::new(inner.deep_clone(seen_tees)),
936 metadata: metadata.clone(),
937 },
938 HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
939 inner: Box::new(inner.deep_clone(seen_tees)),
940 metadata: metadata.clone(),
941 },
942 HydroNode::Delta { inner, metadata } => HydroNode::Delta {
943 inner: Box::new(inner.deep_clone(seen_tees)),
944 metadata: metadata.clone(),
945 },
946 HydroNode::Chain {
947 first,
948 second,
949 metadata,
950 } => HydroNode::Chain {
951 first: Box::new(first.deep_clone(seen_tees)),
952 second: Box::new(second.deep_clone(seen_tees)),
953 metadata: metadata.clone(),
954 },
955 HydroNode::CrossProduct {
956 left,
957 right,
958 metadata,
959 } => HydroNode::CrossProduct {
960 left: Box::new(left.deep_clone(seen_tees)),
961 right: Box::new(right.deep_clone(seen_tees)),
962 metadata: metadata.clone(),
963 },
964 HydroNode::CrossSingleton {
965 left,
966 right,
967 metadata,
968 } => HydroNode::CrossSingleton {
969 left: Box::new(left.deep_clone(seen_tees)),
970 right: Box::new(right.deep_clone(seen_tees)),
971 metadata: metadata.clone(),
972 },
973 HydroNode::Join {
974 left,
975 right,
976 metadata,
977 } => HydroNode::Join {
978 left: Box::new(left.deep_clone(seen_tees)),
979 right: Box::new(right.deep_clone(seen_tees)),
980 metadata: metadata.clone(),
981 },
982 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
983 pos: Box::new(pos.deep_clone(seen_tees)),
984 neg: Box::new(neg.deep_clone(seen_tees)),
985 metadata: metadata.clone(),
986 },
987 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
988 pos: Box::new(pos.deep_clone(seen_tees)),
989 neg: Box::new(neg.deep_clone(seen_tees)),
990 metadata: metadata.clone(),
991 },
992 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
993 input: Box::new(input.deep_clone(seen_tees)),
994 metadata: metadata.clone(),
995 },
996 HydroNode::ResolveFuturesOrdered { input, metadata } => {
997 HydroNode::ResolveFuturesOrdered {
998 input: Box::new(input.deep_clone(seen_tees)),
999 metadata: metadata.clone(),
1000 }
1001 }
1002 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1003 f: f.clone(),
1004 input: Box::new(input.deep_clone(seen_tees)),
1005 metadata: metadata.clone(),
1006 },
1007 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1008 f: f.clone(),
1009 input: Box::new(input.deep_clone(seen_tees)),
1010 metadata: metadata.clone(),
1011 },
1012 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1013 f: f.clone(),
1014 input: Box::new(input.deep_clone(seen_tees)),
1015 metadata: metadata.clone(),
1016 },
1017 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1018 f: f.clone(),
1019 input: Box::new(input.deep_clone(seen_tees)),
1020 metadata: metadata.clone(),
1021 },
1022 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
1023 input: Box::new(input.deep_clone(seen_tees)),
1024 metadata: metadata.clone(),
1025 },
1026 HydroNode::Enumerate {
1027 is_static,
1028 input,
1029 metadata,
1030 } => HydroNode::Enumerate {
1031 is_static: *is_static,
1032 input: Box::new(input.deep_clone(seen_tees)),
1033 metadata: metadata.clone(),
1034 },
1035 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
1036 f: f.clone(),
1037 input: Box::new(input.deep_clone(seen_tees)),
1038 metadata: metadata.clone(),
1039 },
1040 HydroNode::Unique { input, metadata } => HydroNode::Unique {
1041 input: Box::new(input.deep_clone(seen_tees)),
1042 metadata: metadata.clone(),
1043 },
1044 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1045 input: Box::new(input.deep_clone(seen_tees)),
1046 metadata: metadata.clone(),
1047 },
1048 HydroNode::Fold {
1049 init,
1050 acc,
1051 input,
1052 metadata,
1053 } => HydroNode::Fold {
1054 init: init.clone(),
1055 acc: acc.clone(),
1056 input: Box::new(input.deep_clone(seen_tees)),
1057 metadata: metadata.clone(),
1058 },
1059 HydroNode::FoldKeyed {
1060 init,
1061 acc,
1062 input,
1063 metadata,
1064 } => HydroNode::FoldKeyed {
1065 init: init.clone(),
1066 acc: acc.clone(),
1067 input: Box::new(input.deep_clone(seen_tees)),
1068 metadata: metadata.clone(),
1069 },
1070 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1071 f: f.clone(),
1072 input: Box::new(input.deep_clone(seen_tees)),
1073 metadata: metadata.clone(),
1074 },
1075 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1076 f: f.clone(),
1077 input: Box::new(input.deep_clone(seen_tees)),
1078 metadata: metadata.clone(),
1079 },
1080 HydroNode::Network {
1081 from_key,
1082 to_location,
1083 to_key,
1084 serialize_fn,
1085 instantiate_fn,
1086 deserialize_fn,
1087 input,
1088 metadata,
1089 } => HydroNode::Network {
1090 from_key: *from_key,
1091 to_location: to_location.clone(),
1092 to_key: *to_key,
1093 serialize_fn: serialize_fn.clone(),
1094 instantiate_fn: instantiate_fn.clone(),
1095 deserialize_fn: deserialize_fn.clone(),
1096 input: Box::new(input.deep_clone(seen_tees)),
1097 metadata: metadata.clone(),
1098 },
1099 HydroNode::Counter {
1100 tag,
1101 duration,
1102 input,
1103 metadata,
1104 } => HydroNode::Counter {
1105 tag: tag.clone(),
1106 duration: duration.clone(),
1107 input: Box::new(input.deep_clone(seen_tees)),
1108 metadata: metadata.clone(),
1109 },
1110 }
1111 }
1112
1113 #[cfg(feature = "build")]
1114 pub fn emit_core(
1115 &mut self,
1116 builders_or_callback: &mut BuildersOrCallback<
1117 impl FnMut(&mut HydroLeaf, &mut usize),
1118 impl FnMut(&mut HydroNode, &mut usize),
1119 >,
1120 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1121 next_stmt_id: &mut usize,
1122 ) -> (syn::Ident, usize) {
1123 match self {
1124 HydroNode::Placeholder => {
1125 panic!()
1126 }
1127
1128 HydroNode::Persist { inner, .. } => {
1129 let (inner_ident, location) =
1130 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1131
1132 let persist_ident =
1133 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1134
1135 match builders_or_callback {
1136 BuildersOrCallback::Builders(graph_builders) => {
1137 let builder = graph_builders.entry(location).or_default();
1138 builder.add_dfir(
1139 parse_quote! {
1140 #persist_ident = #inner_ident -> persist::<'static>();
1141 },
1142 None,
1143 Some(&next_stmt_id.to_string()),
1144 );
1145 }
1146 BuildersOrCallback::Callback(_, node_callback) => {
1147 node_callback(self, next_stmt_id);
1148 }
1149 }
1150
1151 *next_stmt_id += 1;
1152
1153 (persist_ident, location)
1154 }
1155
1156 HydroNode::Unpersist { .. } => {
1157 panic!(
1158 "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1159 )
1160 }
1161
1162 HydroNode::Delta { inner, .. } => {
1163 let (inner_ident, location) =
1164 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1165
1166 let delta_ident =
1167 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1168
1169 match builders_or_callback {
1170 BuildersOrCallback::Builders(graph_builders) => {
1171 let builder = graph_builders.entry(location).or_default();
1172 builder.add_dfir(
1173 parse_quote! {
1174 #delta_ident = #inner_ident -> multiset_delta();
1175 },
1176 None,
1177 Some(&next_stmt_id.to_string()),
1178 );
1179 }
1180 BuildersOrCallback::Callback(_, node_callback) => {
1181 node_callback(self, next_stmt_id);
1182 }
1183 }
1184
1185 *next_stmt_id += 1;
1186
1187 (delta_ident, location)
1188 }
1189
1190 HydroNode::Source {
1191 source,
1192 location_kind,
1193 ..
1194 } => {
1195 let location_id = match location_kind.clone() {
1196 LocationId::Process(id) => id,
1197 LocationId::Cluster(id) => id,
1198 LocationId::Tick(_, _) => panic!(),
1199 LocationId::ExternalProcess(id) => id,
1200 };
1201
1202 if let HydroSource::ExternalNetwork() = source {
1203 (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1204 } else {
1205 let source_ident =
1206 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1207
1208 let source_stmt = match source {
1209 HydroSource::Stream(expr) => {
1210 parse_quote! {
1211 #source_ident = source_stream(#expr);
1212 }
1213 }
1214
1215 HydroSource::ExternalNetwork() => {
1216 unreachable!()
1217 }
1218
1219 HydroSource::Iter(expr) => {
1220 parse_quote! {
1221 #source_ident = source_iter(#expr);
1222 }
1223 }
1224
1225 HydroSource::Spin() => {
1226 parse_quote! {
1227 #source_ident = spin();
1228 }
1229 }
1230 };
1231
1232 match builders_or_callback {
1233 BuildersOrCallback::Builders(graph_builders) => {
1234 let builder = graph_builders.entry(location_id).or_default();
1235 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1236 }
1237 BuildersOrCallback::Callback(_, node_callback) => {
1238 node_callback(self, next_stmt_id);
1239 }
1240 }
1241
1242 *next_stmt_id += 1;
1243
1244 (source_ident, location_id)
1245 }
1246 }
1247
1248 HydroNode::CycleSource {
1249 ident,
1250 location_kind,
1251 ..
1252 } => {
1253 let location_id = *match location_kind.root() {
1254 LocationId::Process(id) => id,
1255 LocationId::Cluster(id) => id,
1256 LocationId::Tick(_, _) => panic!(),
1257 LocationId::ExternalProcess(_) => panic!(),
1258 };
1259
1260 let ident = ident.clone();
1261
1262 match builders_or_callback {
1263 BuildersOrCallback::Builders(_) => {}
1264 BuildersOrCallback::Callback(_, node_callback) => {
1265 node_callback(self, next_stmt_id);
1266 }
1267 }
1268
1269 *next_stmt_id += 1;
1271
1272 (ident, location_id)
1273 }
1274
1275 HydroNode::Tee { inner, .. } => {
1276 let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1277 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1278 {
1279 match builders_or_callback {
1280 BuildersOrCallback::Builders(_) => {}
1281 BuildersOrCallback::Callback(_, node_callback) => {
1282 node_callback(self, next_stmt_id);
1283 }
1284 }
1285
1286 (teed_from.clone(), *inner_location_id)
1287 } else {
1288 let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1289 builders_or_callback,
1290 built_tees,
1291 next_stmt_id,
1292 );
1293
1294 let tee_ident =
1295 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1296
1297 built_tees.insert(
1298 inner.0.as_ref() as *const RefCell<HydroNode>,
1299 (tee_ident.clone(), inner_location_id),
1300 );
1301
1302 match builders_or_callback {
1303 BuildersOrCallback::Builders(graph_builders) => {
1304 let builder = graph_builders.entry(inner_location_id).or_default();
1305 builder.add_dfir(
1306 parse_quote! {
1307 #tee_ident = #inner_ident -> tee();
1308 },
1309 None,
1310 Some(&next_stmt_id.to_string()),
1311 );
1312 }
1313 BuildersOrCallback::Callback(_, node_callback) => {
1314 node_callback(self, next_stmt_id);
1315 }
1316 }
1317
1318 (tee_ident, inner_location_id)
1319 };
1320
1321 *next_stmt_id += 1;
1325 (ret_ident, inner_location_id)
1326 }
1327
1328 HydroNode::Chain { first, second, .. } => {
1329 let (first_ident, first_location_id) =
1330 first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1331 let (second_ident, second_location_id) =
1332 second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1333
1334 assert_eq!(
1335 first_location_id, second_location_id,
1336 "chain inputs must be in the same location"
1337 );
1338
1339 let chain_ident =
1340 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1341
1342 match builders_or_callback {
1343 BuildersOrCallback::Builders(graph_builders) => {
1344 let builder = graph_builders.entry(first_location_id).or_default();
1345 builder.add_dfir(
1346 parse_quote! {
1347 #chain_ident = chain();
1348 #first_ident -> [0]#chain_ident;
1349 #second_ident -> [1]#chain_ident;
1350 },
1351 None,
1352 Some(&next_stmt_id.to_string()),
1353 );
1354 }
1355 BuildersOrCallback::Callback(_, node_callback) => {
1356 node_callback(self, next_stmt_id);
1357 }
1358 }
1359
1360 *next_stmt_id += 1;
1361
1362 (chain_ident, first_location_id)
1363 }
1364
1365 HydroNode::CrossSingleton { left, right, .. } => {
1366 let (left_ident, left_location_id) =
1367 left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1368 let (right_ident, right_location_id) =
1369 right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1370
1371 assert_eq!(
1372 left_location_id, right_location_id,
1373 "cross_singleton inputs must be in the same location"
1374 );
1375
1376 let cross_ident =
1377 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1378
1379 match builders_or_callback {
1380 BuildersOrCallback::Builders(graph_builders) => {
1381 let builder = graph_builders.entry(left_location_id).or_default();
1382 builder.add_dfir(
1383 parse_quote! {
1384 #cross_ident = cross_singleton();
1385 #left_ident -> [input]#cross_ident;
1386 #right_ident -> [single]#cross_ident;
1387 },
1388 None,
1389 Some(&next_stmt_id.to_string()),
1390 );
1391 }
1392 BuildersOrCallback::Callback(_, node_callback) => {
1393 node_callback(self, next_stmt_id);
1394 }
1395 }
1396
1397 *next_stmt_id += 1;
1398
1399 (cross_ident, left_location_id)
1400 }
1401
1402 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1403 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1404 parse_quote!(cross_join_multiset)
1405 } else {
1406 parse_quote!(join_multiset)
1407 };
1408
1409 let (HydroNode::CrossProduct { left, right, .. }
1410 | HydroNode::Join { left, right, .. }) = self
1411 else {
1412 unreachable!()
1413 };
1414
1415 let (left_inner, left_lifetime) =
1416 if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1417 (left, quote!('static))
1418 } else {
1419 (left, quote!('tick))
1420 };
1421
1422 let (right_inner, right_lifetime) =
1423 if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1424 (right, quote!('static))
1425 } else {
1426 (right, quote!('tick))
1427 };
1428
1429 let (left_ident, left_location_id) =
1430 left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1431 let (right_ident, right_location_id) =
1432 right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1433
1434 assert_eq!(
1435 left_location_id, right_location_id,
1436 "join / cross product inputs must be in the same location"
1437 );
1438
1439 let stream_ident =
1440 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1441
1442 match builders_or_callback {
1443 BuildersOrCallback::Builders(graph_builders) => {
1444 let builder = graph_builders.entry(left_location_id).or_default();
1445 builder.add_dfir(
1446 parse_quote! {
1447 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1448 #left_ident -> [0]#stream_ident;
1449 #right_ident -> [1]#stream_ident;
1450 },
1451 None,
1452 Some(&next_stmt_id.to_string()),
1453 );
1454 }
1455 BuildersOrCallback::Callback(_, node_callback) => {
1456 node_callback(self, next_stmt_id);
1457 }
1458 }
1459
1460 *next_stmt_id += 1;
1461
1462 (stream_ident, left_location_id)
1463 }
1464
1465 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1466 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1467 parse_quote!(difference_multiset)
1468 } else {
1469 parse_quote!(anti_join_multiset)
1470 };
1471
1472 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1473 self
1474 else {
1475 unreachable!()
1476 };
1477
1478 let (neg, neg_lifetime) =
1479 if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
1480 (neg, quote!('static))
1481 } else {
1482 (neg, quote!('tick))
1483 };
1484
1485 let (pos_ident, pos_location_id) =
1486 pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
1487 let (neg_ident, neg_location_id) =
1488 neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
1489
1490 assert_eq!(
1491 pos_location_id, neg_location_id,
1492 "difference / anti join inputs must be in the same location"
1493 );
1494
1495 let stream_ident =
1496 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1497
1498 match builders_or_callback {
1499 BuildersOrCallback::Builders(graph_builders) => {
1500 let builder = graph_builders.entry(pos_location_id).or_default();
1501 builder.add_dfir(
1502 parse_quote! {
1503 #stream_ident = #operator::<'tick, #neg_lifetime>();
1504 #pos_ident -> [pos]#stream_ident;
1505 #neg_ident -> [neg]#stream_ident;
1506 },
1507 None,
1508 Some(&next_stmt_id.to_string()),
1509 );
1510 }
1511 BuildersOrCallback::Callback(_, node_callback) => {
1512 node_callback(self, next_stmt_id);
1513 }
1514 }
1515
1516 *next_stmt_id += 1;
1517
1518 (stream_ident, pos_location_id)
1519 }
1520
1521 HydroNode::ResolveFutures { input, .. } => {
1522 let (input_ident, input_location_id) =
1523 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1524
1525 let futures_ident =
1526 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1527
1528 match builders_or_callback {
1529 BuildersOrCallback::Builders(graph_builders) => {
1530 let builder = graph_builders.entry(input_location_id).or_default();
1531 builder.add_dfir(
1532 parse_quote! {
1533 #futures_ident = #input_ident -> resolve_futures();
1534 },
1535 None,
1536 Some(&next_stmt_id.to_string()),
1537 );
1538 }
1539 BuildersOrCallback::Callback(_, node_callback) => {
1540 node_callback(self, next_stmt_id);
1541 }
1542 }
1543
1544 *next_stmt_id += 1;
1545
1546 (futures_ident, input_location_id)
1547 }
1548
1549 HydroNode::ResolveFuturesOrdered { input, .. } => {
1550 let (input_ident, input_location_id) =
1551 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1552
1553 let futures_ident =
1554 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1555
1556 match builders_or_callback {
1557 BuildersOrCallback::Builders(graph_builders) => {
1558 let builder = graph_builders.entry(input_location_id).or_default();
1559 builder.add_dfir(
1560 parse_quote! {
1561 #futures_ident = #input_ident -> resolve_futures_ordered();
1562 },
1563 None,
1564 Some(&next_stmt_id.to_string()),
1565 );
1566 }
1567 BuildersOrCallback::Callback(_, node_callback) => {
1568 node_callback(self, next_stmt_id);
1569 }
1570 }
1571
1572 *next_stmt_id += 1;
1573
1574 (futures_ident, input_location_id)
1575 }
1576
1577 HydroNode::Map { f, input, .. } => {
1578 let (input_ident, input_location_id) =
1579 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1580
1581 let map_ident =
1582 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1583
1584 match builders_or_callback {
1585 BuildersOrCallback::Builders(graph_builders) => {
1586 let builder = graph_builders.entry(input_location_id).or_default();
1587 builder.add_dfir(
1588 parse_quote! {
1589 #map_ident = #input_ident -> map(#f);
1590 },
1591 None,
1592 Some(&next_stmt_id.to_string()),
1593 );
1594 }
1595 BuildersOrCallback::Callback(_, node_callback) => {
1596 node_callback(self, next_stmt_id);
1597 }
1598 }
1599
1600 *next_stmt_id += 1;
1601
1602 (map_ident, input_location_id)
1603 }
1604
1605 HydroNode::FlatMap { f, input, .. } => {
1606 let (input_ident, input_location_id) =
1607 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1608
1609 let flat_map_ident =
1610 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1611
1612 match builders_or_callback {
1613 BuildersOrCallback::Builders(graph_builders) => {
1614 let builder = graph_builders.entry(input_location_id).or_default();
1615 builder.add_dfir(
1616 parse_quote! {
1617 #flat_map_ident = #input_ident -> flat_map(#f);
1618 },
1619 None,
1620 Some(&next_stmt_id.to_string()),
1621 );
1622 }
1623 BuildersOrCallback::Callback(_, node_callback) => {
1624 node_callback(self, next_stmt_id);
1625 }
1626 }
1627
1628 *next_stmt_id += 1;
1629
1630 (flat_map_ident, input_location_id)
1631 }
1632
1633 HydroNode::Filter { f, input, .. } => {
1634 let (input_ident, input_location_id) =
1635 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1636
1637 let filter_ident =
1638 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1639
1640 match builders_or_callback {
1641 BuildersOrCallback::Builders(graph_builders) => {
1642 let builder = graph_builders.entry(input_location_id).or_default();
1643 builder.add_dfir(
1644 parse_quote! {
1645 #filter_ident = #input_ident -> filter(#f);
1646 },
1647 None,
1648 Some(&next_stmt_id.to_string()),
1649 );
1650 }
1651 BuildersOrCallback::Callback(_, node_callback) => {
1652 node_callback(self, next_stmt_id);
1653 }
1654 }
1655
1656 *next_stmt_id += 1;
1657
1658 (filter_ident, input_location_id)
1659 }
1660
1661 HydroNode::FilterMap { f, input, .. } => {
1662 let (input_ident, input_location_id) =
1663 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1664
1665 let filter_map_ident =
1666 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1667
1668 match builders_or_callback {
1669 BuildersOrCallback::Builders(graph_builders) => {
1670 let builder = graph_builders.entry(input_location_id).or_default();
1671 builder.add_dfir(
1672 parse_quote! {
1673 #filter_map_ident = #input_ident -> filter_map(#f);
1674 },
1675 None,
1676 Some(&next_stmt_id.to_string()),
1677 );
1678 }
1679 BuildersOrCallback::Callback(_, node_callback) => {
1680 node_callback(self, next_stmt_id);
1681 }
1682 }
1683
1684 *next_stmt_id += 1;
1685
1686 (filter_map_ident, input_location_id)
1687 }
1688
1689 HydroNode::Sort { input, .. } => {
1690 let (input_ident, input_location_id) =
1691 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1692
1693 let sort_ident =
1694 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1695
1696 match builders_or_callback {
1697 BuildersOrCallback::Builders(graph_builders) => {
1698 let builder = graph_builders.entry(input_location_id).or_default();
1699 builder.add_dfir(
1700 parse_quote! {
1701 #sort_ident = #input_ident -> sort();
1702 },
1703 None,
1704 Some(&next_stmt_id.to_string()),
1705 );
1706 }
1707 BuildersOrCallback::Callback(_, node_callback) => {
1708 node_callback(self, next_stmt_id);
1709 }
1710 }
1711
1712 *next_stmt_id += 1;
1713
1714 (sort_ident, input_location_id)
1715 }
1716
1717 HydroNode::DeferTick { input, .. } => {
1718 let (input_ident, input_location_id) =
1719 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1720
1721 let defer_tick_ident =
1722 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1723
1724 match builders_or_callback {
1725 BuildersOrCallback::Builders(graph_builders) => {
1726 let builder = graph_builders.entry(input_location_id).or_default();
1727 builder.add_dfir(
1728 parse_quote! {
1729 #defer_tick_ident = #input_ident -> defer_tick_lazy();
1730 },
1731 None,
1732 Some(&next_stmt_id.to_string()),
1733 );
1734 }
1735 BuildersOrCallback::Callback(_, node_callback) => {
1736 node_callback(self, next_stmt_id);
1737 }
1738 }
1739
1740 *next_stmt_id += 1;
1741
1742 (defer_tick_ident, input_location_id)
1743 }
1744
1745 HydroNode::Enumerate {
1746 is_static, input, ..
1747 } => {
1748 let (input_ident, input_location_id) =
1749 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1750
1751 let enumerate_ident =
1752 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1753
1754 match builders_or_callback {
1755 BuildersOrCallback::Builders(graph_builders) => {
1756 let builder = graph_builders.entry(input_location_id).or_default();
1757 let lifetime = if *is_static {
1758 quote!('static)
1759 } else {
1760 quote!('tick)
1761 };
1762 builder.add_dfir(
1763 parse_quote! {
1764 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
1765 },
1766 None,
1767 Some(&next_stmt_id.to_string()),
1768 );
1769 }
1770 BuildersOrCallback::Callback(_, node_callback) => {
1771 node_callback(self, next_stmt_id);
1772 }
1773 }
1774
1775 *next_stmt_id += 1;
1776
1777 (enumerate_ident, input_location_id)
1778 }
1779
1780 HydroNode::Inspect { f, input, .. } => {
1781 let (input_ident, input_location_id) =
1782 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1783
1784 let inspect_ident =
1785 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1786
1787 match builders_or_callback {
1788 BuildersOrCallback::Builders(graph_builders) => {
1789 let builder = graph_builders.entry(input_location_id).or_default();
1790 builder.add_dfir(
1791 parse_quote! {
1792 #inspect_ident = #input_ident -> inspect(#f);
1793 },
1794 None,
1795 Some(&next_stmt_id.to_string()),
1796 );
1797 }
1798 BuildersOrCallback::Callback(_, node_callback) => {
1799 node_callback(self, next_stmt_id);
1800 }
1801 }
1802
1803 *next_stmt_id += 1;
1804
1805 (inspect_ident, input_location_id)
1806 }
1807
1808 HydroNode::Unique { input, .. } => {
1809 let (input_ident, input_location_id) =
1810 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1811
1812 let unique_ident =
1813 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1814
1815 match builders_or_callback {
1816 BuildersOrCallback::Builders(graph_builders) => {
1817 let builder = graph_builders.entry(input_location_id).or_default();
1818 builder.add_dfir(
1819 parse_quote! {
1820 #unique_ident = #input_ident -> unique::<'tick>();
1821 },
1822 None,
1823 Some(&next_stmt_id.to_string()),
1824 );
1825 }
1826 BuildersOrCallback::Callback(_, node_callback) => {
1827 node_callback(self, next_stmt_id);
1828 }
1829 }
1830
1831 *next_stmt_id += 1;
1832
1833 (unique_ident, input_location_id)
1834 }
1835
1836 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } => {
1837 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
1838 parse_quote!(fold)
1839 } else {
1840 parse_quote!(fold_keyed)
1841 };
1842
1843 let (HydroNode::Fold {
1844 init, acc, input, ..
1845 }
1846 | HydroNode::FoldKeyed {
1847 init, acc, input, ..
1848 }) = self
1849 else {
1850 unreachable!()
1851 };
1852
1853 let (input, lifetime) =
1854 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1855 (input, quote!('static))
1856 } else {
1857 (input, quote!('tick))
1858 };
1859
1860 let (input_ident, input_location_id) =
1861 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1862
1863 let fold_ident =
1864 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1865
1866 match builders_or_callback {
1867 BuildersOrCallback::Builders(graph_builders) => {
1868 let builder = graph_builders.entry(input_location_id).or_default();
1869 builder.add_dfir(
1870 parse_quote! {
1871 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
1872 },
1873 None,
1874 Some(&next_stmt_id.to_string()),
1875 );
1876 }
1877 BuildersOrCallback::Callback(_, node_callback) => {
1878 node_callback(self, next_stmt_id);
1879 }
1880 }
1881
1882 *next_stmt_id += 1;
1883
1884 (fold_ident, input_location_id)
1885 }
1886
1887 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
1888 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
1889 parse_quote!(reduce)
1890 } else {
1891 parse_quote!(reduce_keyed)
1892 };
1893
1894 let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
1895 self
1896 else {
1897 unreachable!()
1898 };
1899
1900 let (input, lifetime) =
1901 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1902 (input, quote!('static))
1903 } else {
1904 (input, quote!('tick))
1905 };
1906
1907 let (input_ident, input_location_id) =
1908 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1909
1910 let reduce_ident =
1911 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1912
1913 match builders_or_callback {
1914 BuildersOrCallback::Builders(graph_builders) => {
1915 let builder = graph_builders.entry(input_location_id).or_default();
1916 builder.add_dfir(
1917 parse_quote! {
1918 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
1919 },
1920 None,
1921 Some(&next_stmt_id.to_string()),
1922 );
1923 }
1924 BuildersOrCallback::Callback(_, node_callback) => {
1925 node_callback(self, next_stmt_id);
1926 }
1927 }
1928
1929 *next_stmt_id += 1;
1930
1931 (reduce_ident, input_location_id)
1932 }
1933
1934 HydroNode::Network {
1935 from_key: _,
1936 to_location,
1937 to_key: _,
1938 serialize_fn: serialize_pipeline,
1939 instantiate_fn,
1940 deserialize_fn: deserialize_pipeline,
1941 input,
1942 ..
1943 } => {
1944 let (input_ident, input_location_id) =
1945 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1946
1947 let to_id = match *to_location {
1948 LocationId::Process(id) => id,
1949 LocationId::Cluster(id) => id,
1950 LocationId::Tick(_, _) => panic!(),
1951 LocationId::ExternalProcess(id) => id,
1952 };
1953
1954 let receiver_stream_ident =
1955 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1956
1957 match builders_or_callback {
1958 BuildersOrCallback::Builders(graph_builders) => {
1959 let (sink_expr, source_expr) = match instantiate_fn {
1960 DebugInstantiate::Building => (
1961 syn::parse_quote!(DUMMY_SINK),
1962 syn::parse_quote!(DUMMY_SOURCE),
1963 ),
1964
1965 DebugInstantiate::Finalized(finalized) => {
1966 (finalized.sink.clone(), finalized.source.clone())
1967 }
1968 };
1969
1970 let sender_builder = graph_builders.entry(input_location_id).or_default();
1971 if let Some(serialize_pipeline) = serialize_pipeline {
1972 sender_builder.add_dfir(
1973 parse_quote! {
1974 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
1975 },
1976 None,
1977 Some(&next_stmt_id.to_string()),
1978 );
1979 } else {
1980 sender_builder.add_dfir(
1981 parse_quote! {
1982 #input_ident -> dest_sink(#sink_expr);
1983 },
1984 None,
1985 Some(&next_stmt_id.to_string()),
1986 );
1987 }
1988
1989 let receiver_builder = graph_builders.entry(to_id).or_default();
1990 if let Some(deserialize_pipeline) = deserialize_pipeline {
1991 receiver_builder.add_dfir(parse_quote! {
1992 #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
1993 }, None, Some(&next_stmt_id.to_string()));
1994 } else {
1995 receiver_builder.add_dfir(
1996 parse_quote! {
1997 #receiver_stream_ident = source_stream(#source_expr);
1998 },
1999 None,
2000 Some(&next_stmt_id.to_string()),
2001 );
2002 }
2003 }
2004 BuildersOrCallback::Callback(_, node_callback) => {
2005 node_callback(self, next_stmt_id);
2006 }
2007 }
2008
2009 *next_stmt_id += 1;
2010
2011 (receiver_stream_ident, to_id)
2012 }
2013
2014 HydroNode::Counter {
2015 tag,
2016 duration,
2017 input,
2018 ..
2019 } => {
2020 let (input_ident, input_location_id) =
2021 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
2022
2023 let counter_ident =
2024 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2025
2026 match builders_or_callback {
2027 BuildersOrCallback::Builders(graph_builders) => {
2028 let builder = graph_builders.entry(input_location_id).or_default();
2029 builder.add_dfir(
2030 parse_quote! {
2031 #counter_ident = #input_ident -> _counter(#tag, #duration);
2032 },
2033 None,
2034 Some(&next_stmt_id.to_string()),
2035 );
2036 }
2037 BuildersOrCallback::Callback(_, node_callback) => {
2038 node_callback(self, next_stmt_id);
2039 }
2040 }
2041
2042 *next_stmt_id += 1;
2043
2044 (counter_ident, input_location_id)
2045 }
2046 }
2047 }
2048
2049 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
2050 match self {
2051 HydroNode::Placeholder => {
2052 panic!()
2053 }
2054 HydroNode::Source { source, .. } => match source {
2055 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
2056 HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
2057 },
2058 HydroNode::CycleSource { .. }
2059 | HydroNode::Tee { .. }
2060 | HydroNode::Persist { .. }
2061 | HydroNode::Unpersist { .. }
2062 | HydroNode::Delta { .. }
2063 | HydroNode::Chain { .. }
2064 | HydroNode::CrossProduct { .. }
2065 | HydroNode::CrossSingleton { .. }
2066 | HydroNode::ResolveFutures { .. }
2067 | HydroNode::ResolveFuturesOrdered { .. }
2068 | HydroNode::Join { .. }
2069 | HydroNode::Difference { .. }
2070 | HydroNode::AntiJoin { .. }
2071 | HydroNode::DeferTick { .. }
2072 | HydroNode::Enumerate { .. }
2073 | HydroNode::Unique { .. }
2074 | HydroNode::Sort { .. } => {}
2075 HydroNode::Map { f, .. }
2076 | HydroNode::FlatMap { f, .. }
2077 | HydroNode::Filter { f, .. }
2078 | HydroNode::FilterMap { f, .. }
2079 | HydroNode::Inspect { f, .. }
2080 | HydroNode::Reduce { f, .. }
2081 | HydroNode::ReduceKeyed { f, .. } => {
2082 transform(f);
2083 }
2084 HydroNode::Fold { init, acc, .. } | HydroNode::FoldKeyed { init, acc, .. } => {
2085 transform(init);
2086 transform(acc);
2087 }
2088 HydroNode::Network {
2089 serialize_fn,
2090 deserialize_fn,
2091 ..
2092 } => {
2093 if let Some(serialize_fn) = serialize_fn {
2094 transform(serialize_fn);
2095 }
2096 if let Some(deserialize_fn) = deserialize_fn {
2097 transform(deserialize_fn);
2098 }
2099 }
2100 HydroNode::Counter { duration, .. } => {
2101 transform(duration);
2102 }
2103 }
2104 }
2105
2106 pub fn metadata(&self) -> &HydroIrMetadata {
2107 match self {
2108 HydroNode::Placeholder => {
2109 panic!()
2110 }
2111 HydroNode::Source { metadata, .. } => metadata,
2112 HydroNode::CycleSource { metadata, .. } => metadata,
2113 HydroNode::Tee { metadata, .. } => metadata,
2114 HydroNode::Persist { metadata, .. } => metadata,
2115 HydroNode::Unpersist { metadata, .. } => metadata,
2116 HydroNode::Delta { metadata, .. } => metadata,
2117 HydroNode::Chain { metadata, .. } => metadata,
2118 HydroNode::CrossProduct { metadata, .. } => metadata,
2119 HydroNode::CrossSingleton { metadata, .. } => metadata,
2120 HydroNode::Join { metadata, .. } => metadata,
2121 HydroNode::Difference { metadata, .. } => metadata,
2122 HydroNode::AntiJoin { metadata, .. } => metadata,
2123 HydroNode::ResolveFutures { metadata, .. } => metadata,
2124 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2125 HydroNode::Map { metadata, .. } => metadata,
2126 HydroNode::FlatMap { metadata, .. } => metadata,
2127 HydroNode::Filter { metadata, .. } => metadata,
2128 HydroNode::FilterMap { metadata, .. } => metadata,
2129 HydroNode::DeferTick { metadata, .. } => metadata,
2130 HydroNode::Enumerate { metadata, .. } => metadata,
2131 HydroNode::Inspect { metadata, .. } => metadata,
2132 HydroNode::Unique { metadata, .. } => metadata,
2133 HydroNode::Sort { metadata, .. } => metadata,
2134 HydroNode::Fold { metadata, .. } => metadata,
2135 HydroNode::FoldKeyed { metadata, .. } => metadata,
2136 HydroNode::Reduce { metadata, .. } => metadata,
2137 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2138 HydroNode::Network { metadata, .. } => metadata,
2139 HydroNode::Counter { metadata, .. } => metadata,
2140 }
2141 }
2142
2143 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2144 match self {
2145 HydroNode::Placeholder => {
2146 panic!()
2147 }
2148 HydroNode::Source { metadata, .. } => metadata,
2149 HydroNode::CycleSource { metadata, .. } => metadata,
2150 HydroNode::Tee { metadata, .. } => metadata,
2151 HydroNode::Persist { metadata, .. } => metadata,
2152 HydroNode::Unpersist { metadata, .. } => metadata,
2153 HydroNode::Delta { metadata, .. } => metadata,
2154 HydroNode::Chain { metadata, .. } => metadata,
2155 HydroNode::CrossProduct { metadata, .. } => metadata,
2156 HydroNode::CrossSingleton { metadata, .. } => metadata,
2157 HydroNode::Join { metadata, .. } => metadata,
2158 HydroNode::Difference { metadata, .. } => metadata,
2159 HydroNode::AntiJoin { metadata, .. } => metadata,
2160 HydroNode::ResolveFutures { metadata, .. } => metadata,
2161 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
2162 HydroNode::Map { metadata, .. } => metadata,
2163 HydroNode::FlatMap { metadata, .. } => metadata,
2164 HydroNode::Filter { metadata, .. } => metadata,
2165 HydroNode::FilterMap { metadata, .. } => metadata,
2166 HydroNode::DeferTick { metadata, .. } => metadata,
2167 HydroNode::Enumerate { metadata, .. } => metadata,
2168 HydroNode::Inspect { metadata, .. } => metadata,
2169 HydroNode::Unique { metadata, .. } => metadata,
2170 HydroNode::Sort { metadata, .. } => metadata,
2171 HydroNode::Fold { metadata, .. } => metadata,
2172 HydroNode::FoldKeyed { metadata, .. } => metadata,
2173 HydroNode::Reduce { metadata, .. } => metadata,
2174 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2175 HydroNode::Network { metadata, .. } => metadata,
2176 HydroNode::Counter { metadata, .. } => metadata,
2177 }
2178 }
2179
2180 pub fn print_root(&self) -> String {
2181 match self {
2182 HydroNode::Placeholder => {
2183 panic!()
2184 }
2185 HydroNode::Source { source, .. } => format!("Source({:?})", source),
2186 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2187 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2188 HydroNode::Persist { .. } => "Persist()".to_string(),
2189 HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2190 HydroNode::Delta { .. } => "Delta()".to_string(),
2191 HydroNode::Chain { first, second, .. } => {
2192 format!("Chain({}, {})", first.print_root(), second.print_root())
2193 }
2194 HydroNode::CrossProduct { left, right, .. } => {
2195 format!(
2196 "CrossProduct({}, {})",
2197 left.print_root(),
2198 right.print_root()
2199 )
2200 }
2201 HydroNode::CrossSingleton { left, right, .. } => {
2202 format!(
2203 "CrossSingleton({}, {})",
2204 left.print_root(),
2205 right.print_root()
2206 )
2207 }
2208 HydroNode::Join { left, right, .. } => {
2209 format!("Join({}, {})", left.print_root(), right.print_root())
2210 }
2211 HydroNode::Difference { pos, neg, .. } => {
2212 format!("Difference({}, {})", pos.print_root(), neg.print_root())
2213 }
2214 HydroNode::AntiJoin { pos, neg, .. } => {
2215 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2216 }
2217 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_string(),
2218 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_string(),
2219 HydroNode::Map { f, .. } => format!("Map({:?})", f),
2220 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2221 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2222 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2223 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2224 HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2225 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2226 HydroNode::Unique { .. } => "Unique()".to_string(),
2227 HydroNode::Sort { .. } => "Sort()".to_string(),
2228 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2229 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2230 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2231 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2232 HydroNode::Network { to_location, .. } => format!("Network(to {:?})", to_location),
2233 HydroNode::Counter { tag, duration, .. } => {
2234 format!("Counter({:?}, {:?})", tag, duration)
2235 }
2236 }
2237 }
2238}
2239
2240#[cfg(feature = "build")]
2241#[expect(clippy::too_many_arguments, reason = "networking internals")]
2242fn instantiate_network<'a, D>(
2243 from_location: &LocationId,
2244 from_key: Option<usize>,
2245 to_location: &LocationId,
2246 to_key: Option<usize>,
2247 nodes: &HashMap<usize, D::Process>,
2248 clusters: &HashMap<usize, D::Cluster>,
2249 externals: &HashMap<usize, D::ExternalProcess>,
2250 compile_env: &D::CompileEnv,
2251) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
2252where
2253 D: Deploy<'a>,
2254{
2255 let ((sink, source), connect_fn) = match (from_location, to_location) {
2256 (LocationId::Process(from), LocationId::Process(to)) => {
2257 let from_node = nodes
2258 .get(from)
2259 .unwrap_or_else(|| {
2260 panic!("A process used in the graph was not instantiated: {}", from)
2261 })
2262 .clone();
2263 let to_node = nodes
2264 .get(to)
2265 .unwrap_or_else(|| {
2266 panic!("A process used in the graph was not instantiated: {}", to)
2267 })
2268 .clone();
2269
2270 let sink_port = D::allocate_process_port(&from_node);
2271 let source_port = D::allocate_process_port(&to_node);
2272
2273 (
2274 D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2275 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
2276 )
2277 }
2278 (LocationId::Process(from), LocationId::Cluster(to)) => {
2279 let from_node = nodes
2280 .get(from)
2281 .unwrap_or_else(|| {
2282 panic!("A process used in the graph was not instantiated: {}", from)
2283 })
2284 .clone();
2285 let to_node = clusters
2286 .get(to)
2287 .unwrap_or_else(|| {
2288 panic!("A cluster used in the graph was not instantiated: {}", to)
2289 })
2290 .clone();
2291
2292 let sink_port = D::allocate_process_port(&from_node);
2293 let source_port = D::allocate_cluster_port(&to_node);
2294
2295 (
2296 D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2297 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
2298 )
2299 }
2300 (LocationId::Cluster(from), LocationId::Process(to)) => {
2301 let from_node = clusters
2302 .get(from)
2303 .unwrap_or_else(|| {
2304 panic!("A cluster used in the graph was not instantiated: {}", from)
2305 })
2306 .clone();
2307 let to_node = nodes
2308 .get(to)
2309 .unwrap_or_else(|| {
2310 panic!("A process used in the graph was not instantiated: {}", to)
2311 })
2312 .clone();
2313
2314 let sink_port = D::allocate_cluster_port(&from_node);
2315 let source_port = D::allocate_process_port(&to_node);
2316
2317 (
2318 D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2319 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
2320 )
2321 }
2322 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
2323 let from_node = clusters
2324 .get(from)
2325 .unwrap_or_else(|| {
2326 panic!("A cluster used in the graph was not instantiated: {}", from)
2327 })
2328 .clone();
2329 let to_node = clusters
2330 .get(to)
2331 .unwrap_or_else(|| {
2332 panic!("A cluster used in the graph was not instantiated: {}", to)
2333 })
2334 .clone();
2335
2336 let sink_port = D::allocate_cluster_port(&from_node);
2337 let source_port = D::allocate_cluster_port(&to_node);
2338
2339 (
2340 D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2341 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
2342 )
2343 }
2344 (LocationId::ExternalProcess(from), LocationId::Process(to)) => {
2345 let from_node = externals
2346 .get(from)
2347 .unwrap_or_else(|| {
2348 panic!(
2349 "A external used in the graph was not instantiated: {}",
2350 from
2351 )
2352 })
2353 .clone();
2354
2355 let to_node = nodes
2356 .get(to)
2357 .unwrap_or_else(|| {
2358 panic!("A process used in the graph was not instantiated: {}", to)
2359 })
2360 .clone();
2361
2362 let sink_port = D::allocate_external_port(&from_node);
2363 let source_port = D::allocate_process_port(&to_node);
2364
2365 from_node.register(from_key.unwrap(), sink_port.clone());
2366
2367 (
2368 (
2369 parse_quote!(DUMMY),
2370 D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2371 ),
2372 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port),
2373 )
2374 }
2375 (LocationId::ExternalProcess(_from), LocationId::Cluster(_to)) => {
2376 todo!("NYI")
2377 }
2378 (LocationId::ExternalProcess(_), LocationId::ExternalProcess(_)) => {
2379 panic!("Cannot send from external to external")
2380 }
2381 (LocationId::Process(from), LocationId::ExternalProcess(to)) => {
2382 let from_node = nodes
2383 .get(from)
2384 .unwrap_or_else(|| {
2385 panic!("A process used in the graph was not instantiated: {}", from)
2386 })
2387 .clone();
2388
2389 let to_node = externals
2390 .get(to)
2391 .unwrap_or_else(|| {
2392 panic!("A external used in the graph was not instantiated: {}", to)
2393 })
2394 .clone();
2395
2396 let sink_port = D::allocate_process_port(&from_node);
2397 let source_port = D::allocate_external_port(&to_node);
2398
2399 to_node.register(to_key.unwrap(), source_port.clone());
2400
2401 (
2402 (
2403 D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
2404 parse_quote!(DUMMY),
2405 ),
2406 D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
2407 )
2408 }
2409 (LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => {
2410 todo!("NYI")
2411 }
2412 (LocationId::Tick(_, _), _) => panic!(),
2413 (_, LocationId::Tick(_, _)) => panic!(),
2414 };
2415 (sink, source, connect_fn)
2416}
2417
2418#[cfg(test)]
2419mod test {
2420 use std::mem::size_of;
2421
2422 use super::*;
2423
2424 #[test]
2425 fn hydro_node_size() {
2426 insta::assert_snapshot!(size_of::<HydroNode>(), @"152");
2427 }
2428
2429 #[test]
2430 fn hydro_leaf_size() {
2431 insta::assert_snapshot!(size_of::<HydroLeaf>(), @"128");
2432 }
2433}