1use core::panic;
2use std::cell::RefCell;
3#[cfg(feature = "build")]
4use std::collections::BTreeMap;
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use syn::parse_quote;
21
22#[cfg(feature = "build")]
23use crate::deploy::{Deploy, RegisterPort};
24use crate::location::LocationId;
25
26#[derive(Clone, Hash)]
27pub struct DebugExpr(pub syn::Expr);
28
29impl From<syn::Expr> for DebugExpr {
30 fn from(expr: syn::Expr) -> DebugExpr {
31 DebugExpr(expr)
32 }
33}
34
35impl Deref for DebugExpr {
36 type Target = syn::Expr;
37
38 fn deref(&self) -> &Self::Target {
39 &self.0
40 }
41}
42
43impl ToTokens for DebugExpr {
44 fn to_tokens(&self, tokens: &mut TokenStream) {
45 self.0.to_tokens(tokens);
46 }
47}
48
49impl Debug for DebugExpr {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 write!(f, "{}", self.0.to_token_stream())
52 }
53}
54
55#[derive(Clone, Hash)]
56pub struct DebugType(pub syn::Type);
57
58impl From<syn::Type> for DebugType {
59 fn from(t: syn::Type) -> DebugType {
60 DebugType(t)
61 }
62}
63
64impl Deref for DebugType {
65 type Target = syn::Type;
66
67 fn deref(&self) -> &Self::Target {
68 &self.0
69 }
70}
71
72impl ToTokens for DebugType {
73 fn to_tokens(&self, tokens: &mut TokenStream) {
74 self.0.to_tokens(tokens);
75 }
76}
77
78impl Debug for DebugType {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 write!(f, "{}", self.0.to_token_stream())
81 }
82}
83
84#[allow(clippy::allow_attributes, reason = "Only triggered on nightly.")]
85#[allow(
86 clippy::large_enum_variant,
87 reason = "`Building` is just equivalent to `None`."
88)]
89pub enum DebugInstantiate {
90 Building,
91 Finalized(syn::Expr, syn::Expr, Option<Box<dyn FnOnce()>>),
92}
93
94impl Debug for DebugInstantiate {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 write!(f, "<network instantiate>")
97 }
98}
99
100impl Hash for DebugInstantiate {
101 fn hash<H: Hasher>(&self, _state: &mut H) {
102 }
104}
105
106impl Clone for DebugInstantiate {
107 fn clone(&self) -> Self {
108 match self {
109 DebugInstantiate::Building => DebugInstantiate::Building,
110 DebugInstantiate::Finalized(_, _, _) => {
111 panic!("DebugInstantiate::Finalized should not be cloned")
112 }
113 }
114 }
115}
116
117#[derive(Debug, Hash, Clone)]
119pub enum HydroSource {
120 Stream(DebugExpr),
121 ExternalNetwork(),
122 Iter(DebugExpr),
123 Spin(),
124}
125
126#[cfg(feature = "build")]
127pub enum BuildersOrCallback<
128 'a,
129 L: FnMut(&mut HydroLeaf, &mut usize),
130 N: FnMut(&mut HydroNode, &mut usize),
131> {
132 Builders(&'a mut BTreeMap<usize, FlatGraphBuilder>),
133 Callback(L, N),
134}
135
136#[derive(Debug, Hash)]
140pub enum HydroLeaf {
141 ForEach {
142 f: DebugExpr,
143 input: Box<HydroNode>,
144 metadata: HydroIrMetadata,
145 },
146 DestSink {
147 sink: DebugExpr,
148 input: Box<HydroNode>,
149 metadata: HydroIrMetadata,
150 },
151 CycleSink {
152 ident: syn::Ident,
153 location_kind: LocationId,
154 input: Box<HydroNode>,
155 metadata: HydroIrMetadata,
156 },
157}
158
159impl HydroLeaf {
160 #[cfg(feature = "build")]
161 pub fn compile_network<'a, D: Deploy<'a>>(
162 &mut self,
163 compile_env: &D::CompileEnv,
164 seen_tees: &mut SeenTees,
165 seen_tee_locations: &mut SeenTeeLocations,
166 processes: &HashMap<usize, D::Process>,
167 clusters: &HashMap<usize, D::Cluster>,
168 externals: &HashMap<usize, D::ExternalProcess>,
169 ) {
170 self.transform_children(
171 |n, s| {
172 n.compile_network::<D>(
173 compile_env,
174 s,
175 seen_tee_locations,
176 processes,
177 clusters,
178 externals,
179 );
180 },
181 seen_tees,
182 )
183 }
184
185 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
186 self.transform_children(
187 |n, s| {
188 n.connect_network(s);
189 },
190 seen_tees,
191 )
192 }
193
194 pub fn transform_bottom_up(
195 &mut self,
196 transform_leaf: &mut impl FnMut(&mut HydroLeaf),
197 transform_node: &mut impl FnMut(&mut HydroNode),
198 seen_tees: &mut SeenTees,
199 ) {
200 self.transform_children(|n, s| n.transform_bottom_up(transform_node, s), seen_tees);
201
202 transform_leaf(self);
203 }
204
205 pub fn transform_children(
206 &mut self,
207 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
208 seen_tees: &mut SeenTees,
209 ) {
210 match self {
211 HydroLeaf::ForEach { f: _, input, .. }
212 | HydroLeaf::DestSink { sink: _, input, .. }
213 | HydroLeaf::CycleSink {
214 ident: _,
215 location_kind: _,
216 input,
217 ..
218 } => {
219 transform(input, seen_tees);
220 }
221 }
222 }
223
224 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroLeaf {
225 match self {
226 HydroLeaf::ForEach { f, input, metadata } => HydroLeaf::ForEach {
227 f: f.clone(),
228 input: Box::new(input.deep_clone(seen_tees)),
229 metadata: metadata.clone(),
230 },
231 HydroLeaf::DestSink {
232 sink,
233 input,
234 metadata,
235 } => HydroLeaf::DestSink {
236 sink: sink.clone(),
237 input: Box::new(input.deep_clone(seen_tees)),
238 metadata: metadata.clone(),
239 },
240 HydroLeaf::CycleSink {
241 ident,
242 location_kind,
243 input,
244 metadata,
245 } => HydroLeaf::CycleSink {
246 ident: ident.clone(),
247 location_kind: location_kind.clone(),
248 input: Box::new(input.deep_clone(seen_tees)),
249 metadata: metadata.clone(),
250 },
251 }
252 }
253
254 #[cfg(feature = "build")]
255 pub fn emit(
256 &mut self,
257 graph_builders: &mut BTreeMap<usize, FlatGraphBuilder>,
258 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
259 next_stmt_id: &mut usize,
260 ) {
261 self.emit_core(
262 &mut BuildersOrCallback::Builders::<
263 fn(&mut HydroLeaf, &mut usize),
264 fn(&mut HydroNode, &mut usize),
265 >(graph_builders),
266 built_tees,
267 next_stmt_id,
268 );
269 }
270
271 #[cfg(feature = "build")]
272 pub fn emit_core(
273 &mut self,
274 builders_or_callback: &mut BuildersOrCallback<
275 impl FnMut(&mut HydroLeaf, &mut usize),
276 impl FnMut(&mut HydroNode, &mut usize),
277 >,
278 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
279 next_stmt_id: &mut usize,
280 ) {
281 match self {
282 HydroLeaf::ForEach { f, input, .. } => {
283 let (input_ident, input_location_id) =
284 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
285
286 match builders_or_callback {
287 BuildersOrCallback::Builders(graph_builders) => {
288 graph_builders
289 .entry(input_location_id)
290 .or_default()
291 .add_dfir(
292 parse_quote! {
293 #input_ident -> for_each(#f);
294 },
295 None,
296 Some(&next_stmt_id.to_string()),
297 );
298 }
299 BuildersOrCallback::Callback(leaf_callback, _) => {
300 leaf_callback(self, next_stmt_id);
301 }
302 }
303
304 *next_stmt_id += 1;
305 }
306
307 HydroLeaf::DestSink { sink, input, .. } => {
308 let (input_ident, input_location_id) =
309 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
310
311 match builders_or_callback {
312 BuildersOrCallback::Builders(graph_builders) => {
313 graph_builders
314 .entry(input_location_id)
315 .or_default()
316 .add_dfir(
317 parse_quote! {
318 #input_ident -> dest_sink(#sink);
319 },
320 None,
321 Some(&next_stmt_id.to_string()),
322 );
323 }
324 BuildersOrCallback::Callback(leaf_callback, _) => {
325 leaf_callback(self, next_stmt_id);
326 }
327 }
328
329 *next_stmt_id += 1;
330 }
331
332 HydroLeaf::CycleSink {
333 ident,
334 location_kind,
335 input,
336 ..
337 } => {
338 let (input_ident, input_location_id) =
339 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
340
341 let location_id = match location_kind.root() {
342 LocationId::Process(id) => id,
343 LocationId::Cluster(id) => id,
344 LocationId::Tick(_, _) => panic!(),
345 LocationId::ExternalProcess(_) => panic!(),
346 };
347
348 assert_eq!(
349 input_location_id, *location_id,
350 "cycle_sink location mismatch"
351 );
352
353 match builders_or_callback {
354 BuildersOrCallback::Builders(graph_builders) => {
355 graph_builders.entry(*location_id).or_default().add_dfir(
356 parse_quote! {
357 #ident = #input_ident;
358 },
359 None,
360 None,
361 );
362 }
363 BuildersOrCallback::Callback(_, _) => {}
365 }
366 }
367 }
368 }
369
370 pub fn metadata(&self) -> &HydroIrMetadata {
371 match self {
372 HydroLeaf::ForEach { metadata, .. }
373 | HydroLeaf::DestSink { metadata, .. }
374 | HydroLeaf::CycleSink { metadata, .. } => metadata,
375 }
376 }
377
378 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
379 match self {
380 HydroLeaf::ForEach { metadata, .. }
381 | HydroLeaf::DestSink { metadata, .. }
382 | HydroLeaf::CycleSink { metadata, .. } => metadata,
383 }
384 }
385
386 pub fn print_root(&self) -> String {
387 match self {
388 HydroLeaf::ForEach { f, .. } => format!("ForEach({:?})", f),
389 HydroLeaf::DestSink { sink, .. } => format!("DestSink({:?})", sink),
390 HydroLeaf::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
391 }
392 }
393}
394
395#[cfg(feature = "build")]
396pub fn emit(ir: &mut Vec<HydroLeaf>) -> BTreeMap<usize, FlatGraphBuilder> {
397 let mut builders = BTreeMap::new();
398 let mut built_tees = HashMap::new();
399 let mut next_stmt_id = 0;
400 for leaf in ir {
401 leaf.emit(&mut builders, &mut built_tees, &mut next_stmt_id);
402 }
403 builders
404}
405
406#[cfg(feature = "build")]
407pub fn traverse_dfir(
408 ir: &mut [HydroLeaf],
409 transform_leaf: impl FnMut(&mut HydroLeaf, &mut usize),
410 transform_node: impl FnMut(&mut HydroNode, &mut usize),
411) {
412 let mut seen_tees = HashMap::new();
413 let mut next_stmt_id = 0;
414 let mut callback = BuildersOrCallback::Callback(transform_leaf, transform_node);
415 ir.iter_mut().for_each(|leaf| {
416 leaf.emit_core(&mut callback, &mut seen_tees, &mut next_stmt_id);
417 });
418}
419
420pub fn transform_bottom_up(
421 ir: &mut [HydroLeaf],
422 transform_leaf: &mut impl FnMut(&mut HydroLeaf),
423 transform_node: &mut impl FnMut(&mut HydroNode),
424) {
425 let mut seen_tees = HashMap::new();
426 ir.iter_mut().for_each(|leaf| {
427 leaf.transform_bottom_up(transform_leaf, transform_node, &mut seen_tees);
428 });
429}
430
431pub fn deep_clone(ir: &[HydroLeaf]) -> Vec<HydroLeaf> {
432 let mut seen_tees = HashMap::new();
433 ir.iter()
434 .map(|leaf| leaf.deep_clone(&mut seen_tees))
435 .collect()
436}
437
438type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
439thread_local! {
440 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
441}
442
443pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
444 PRINTED_TEES.with(|printed_tees| {
445 let mut printed_tees_mut = printed_tees.borrow_mut();
446 *printed_tees_mut = Some((0, HashMap::new()));
447 drop(printed_tees_mut);
448
449 let ret = f();
450
451 let mut printed_tees_mut = printed_tees.borrow_mut();
452 *printed_tees_mut = None;
453
454 ret
455 })
456}
457
458pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
459
460impl Debug for TeeNode {
461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462 PRINTED_TEES.with(|printed_tees| {
463 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
464 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
465
466 if let Some(printed_tees_mut) = printed_tees_mut {
467 if let Some(existing) = printed_tees_mut
468 .1
469 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
470 {
471 write!(f, "<tee {}>", existing)
472 } else {
473 let next_id = printed_tees_mut.0;
474 printed_tees_mut.0 += 1;
475 printed_tees_mut
476 .1
477 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
478 drop(printed_tees_mut_borrow);
479 write!(f, "<tee {}>: ", next_id)?;
480 Debug::fmt(&self.0.borrow(), f)
481 }
482 } else {
483 drop(printed_tees_mut_borrow);
484 write!(f, "<tee>: ")?;
485 Debug::fmt(&self.0.borrow(), f)
486 }
487 })
488 }
489}
490
491impl Hash for TeeNode {
492 fn hash<H: Hasher>(&self, state: &mut H) {
493 self.0.borrow_mut().hash(state);
494 }
495}
496
497#[derive(Debug, Clone)]
498pub struct HydroIrMetadata {
499 pub location_kind: LocationId,
500 pub output_type: Option<DebugType>,
501 pub cardinality: Option<usize>,
502 pub cpu_usage: Option<f64>,
503}
504
505impl Hash for HydroIrMetadata {
507 fn hash<H: Hasher>(&self, _: &mut H) {}
508}
509
510impl PartialEq for HydroIrMetadata {
511 fn eq(&self, _: &Self) -> bool {
512 true
513 }
514}
515
516impl Eq for HydroIrMetadata {}
517
518#[allow(clippy::allow_attributes, reason = "Only triggered on nightly.")]
521#[allow(clippy::large_enum_variant, reason = "TODO(mingwei):")]
522#[derive(Debug, Hash)]
523pub enum HydroNode {
524 Placeholder,
525
526 Source {
527 source: HydroSource,
528 location_kind: LocationId,
529 metadata: HydroIrMetadata,
530 },
531
532 CycleSource {
533 ident: syn::Ident,
534 location_kind: LocationId,
535 metadata: HydroIrMetadata,
536 },
537
538 Tee {
539 inner: TeeNode,
540 metadata: HydroIrMetadata,
541 },
542
543 Persist {
544 inner: Box<HydroNode>,
545 metadata: HydroIrMetadata,
546 },
547
548 Unpersist {
549 inner: Box<HydroNode>,
550 metadata: HydroIrMetadata,
551 },
552
553 Delta {
554 inner: Box<HydroNode>,
555 metadata: HydroIrMetadata,
556 },
557
558 Chain {
559 first: Box<HydroNode>,
560 second: Box<HydroNode>,
561 metadata: HydroIrMetadata,
562 },
563
564 CrossProduct {
565 left: Box<HydroNode>,
566 right: Box<HydroNode>,
567 metadata: HydroIrMetadata,
568 },
569
570 CrossSingleton {
571 left: Box<HydroNode>,
572 right: Box<HydroNode>,
573 metadata: HydroIrMetadata,
574 },
575
576 Join {
577 left: Box<HydroNode>,
578 right: Box<HydroNode>,
579 metadata: HydroIrMetadata,
580 },
581
582 Difference {
583 pos: Box<HydroNode>,
584 neg: Box<HydroNode>,
585 metadata: HydroIrMetadata,
586 },
587
588 AntiJoin {
589 pos: Box<HydroNode>,
590 neg: Box<HydroNode>,
591 metadata: HydroIrMetadata,
592 },
593
594 Map {
595 f: DebugExpr,
596 input: Box<HydroNode>,
597 metadata: HydroIrMetadata,
598 },
599 FlatMap {
600 f: DebugExpr,
601 input: Box<HydroNode>,
602 metadata: HydroIrMetadata,
603 },
604 Filter {
605 f: DebugExpr,
606 input: Box<HydroNode>,
607 metadata: HydroIrMetadata,
608 },
609 FilterMap {
610 f: DebugExpr,
611 input: Box<HydroNode>,
612 metadata: HydroIrMetadata,
613 },
614
615 DeferTick {
616 input: Box<HydroNode>,
617 metadata: HydroIrMetadata,
618 },
619 Enumerate {
620 is_static: bool,
621 input: Box<HydroNode>,
622 metadata: HydroIrMetadata,
623 },
624 Inspect {
625 f: DebugExpr,
626 input: Box<HydroNode>,
627 metadata: HydroIrMetadata,
628 },
629
630 Unique {
631 input: Box<HydroNode>,
632 metadata: HydroIrMetadata,
633 },
634
635 Sort {
636 input: Box<HydroNode>,
637 metadata: HydroIrMetadata,
638 },
639 Fold {
640 init: DebugExpr,
641 acc: DebugExpr,
642 input: Box<HydroNode>,
643 metadata: HydroIrMetadata,
644 },
645 FoldKeyed {
646 init: DebugExpr,
647 acc: DebugExpr,
648 input: Box<HydroNode>,
649 metadata: HydroIrMetadata,
650 },
651
652 Reduce {
653 f: DebugExpr,
654 input: Box<HydroNode>,
655 metadata: HydroIrMetadata,
656 },
657 ReduceKeyed {
658 f: DebugExpr,
659 input: Box<HydroNode>,
660 metadata: HydroIrMetadata,
661 },
662
663 Network {
664 from_key: Option<usize>,
665 to_location: LocationId,
666 to_key: Option<usize>,
667 serialize_fn: Option<DebugExpr>,
668 instantiate_fn: DebugInstantiate,
669 deserialize_fn: Option<DebugExpr>,
670 input: Box<HydroNode>,
671 metadata: HydroIrMetadata,
672 },
673
674 Counter {
675 tag: String,
676 duration: DebugExpr,
677 input: Box<HydroNode>,
678 metadata: HydroIrMetadata,
679 },
680}
681
682pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
683pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
684
685impl<'a> HydroNode {
686 #[cfg(feature = "build")]
687 pub fn compile_network<D: Deploy<'a>>(
688 &mut self,
689 compile_env: &D::CompileEnv,
690 seen_tees: &mut SeenTees,
691 seen_tee_locations: &mut SeenTeeLocations,
692 nodes: &HashMap<usize, D::Process>,
693 clusters: &HashMap<usize, D::Cluster>,
694 externals: &HashMap<usize, D::ExternalProcess>,
695 ) {
696 let mut curr_location = None;
697
698 self.transform_bottom_up(
699 &mut |n| {
700 if let HydroNode::Network {
701 from_key,
702 to_location,
703 to_key,
704 instantiate_fn,
705 ..
706 } = n
707 {
708 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
709 DebugInstantiate::Building => instantiate_network::<D>(
710 curr_location.as_ref().unwrap(),
711 *from_key,
712 to_location,
713 *to_key,
714 nodes,
715 clusters,
716 externals,
717 compile_env,
718 ),
719
720 DebugInstantiate::Finalized(_, _, _) => panic!("network already finalized"),
721 };
722
723 *instantiate_fn =
724 DebugInstantiate::Finalized(sink_expr, source_expr, Some(connect_fn));
725 }
726
727 match n {
729 HydroNode::Network {
730 to_location: location_kind,
731 ..
732 }
733 | HydroNode::CycleSource { location_kind, .. }
734 | HydroNode::Source { location_kind, .. } => {
735 if let LocationId::Tick(_, tick_loc) = location_kind {
737 curr_location = Some(*tick_loc.clone());
738 } else {
739 curr_location = Some(location_kind.clone());
740 }
741 }
742 HydroNode::Tee { inner, .. } => {
743 let inner_ref = inner.0.as_ref() as *const RefCell<HydroNode>;
744 if let Some(tee_location) = seen_tee_locations.get(&inner_ref) {
745 curr_location = Some(tee_location.clone());
746 } else {
747 seen_tee_locations
748 .insert(inner_ref, curr_location.as_ref().unwrap().clone());
749 }
750 }
751 _ => {}
752 }
753 },
754 seen_tees,
755 );
756 }
757
758 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
759 self.transform_bottom_up(
760 &mut |n| {
761 if let HydroNode::Network { instantiate_fn, .. } = n {
762 match instantiate_fn {
763 DebugInstantiate::Building => panic!("network not built"),
764
765 DebugInstantiate::Finalized(_, _, connect_fn) => {
766 (connect_fn.take().unwrap())();
767 }
768 }
769 }
770 },
771 seen_tees,
772 );
773 }
774
775 pub fn transform_bottom_up(
776 &mut self,
777 transform: &mut impl FnMut(&mut HydroNode),
778 seen_tees: &mut SeenTees,
779 ) {
780 self.transform_children(|n, s| n.transform_bottom_up(transform, s), seen_tees);
781
782 transform(self);
783 }
784
785 #[inline(always)]
786 pub fn transform_children(
787 &mut self,
788 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
789 seen_tees: &mut SeenTees,
790 ) {
791 match self {
792 HydroNode::Placeholder => {
793 panic!();
794 }
795
796 HydroNode::Source { .. } | HydroNode::CycleSource { .. } => {}
797
798 HydroNode::Tee { inner, .. } => {
799 if let Some(transformed) =
800 seen_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
801 {
802 *inner = TeeNode(transformed.clone());
803 } else {
804 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
805 seen_tees.insert(
806 inner.0.as_ref() as *const RefCell<HydroNode>,
807 transformed_cell.clone(),
808 );
809 let mut orig = inner.0.replace(HydroNode::Placeholder);
810 transform(&mut orig, seen_tees);
811 *transformed_cell.borrow_mut() = orig;
812 *inner = TeeNode(transformed_cell);
813 }
814 }
815
816 HydroNode::Persist { inner, .. }
817 | HydroNode::Unpersist { inner, .. }
818 | HydroNode::Delta { inner, .. } => {
819 transform(inner.as_mut(), seen_tees);
820 }
821
822 HydroNode::Chain { first, second, .. } => {
823 transform(first.as_mut(), seen_tees);
824 transform(second.as_mut(), seen_tees);
825 }
826
827 HydroNode::CrossSingleton { left, right, .. }
828 | HydroNode::CrossProduct { left, right, .. }
829 | HydroNode::Join { left, right, .. } => {
830 transform(left.as_mut(), seen_tees);
831 transform(right.as_mut(), seen_tees);
832 }
833
834 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
835 transform(pos.as_mut(), seen_tees);
836 transform(neg.as_mut(), seen_tees);
837 }
838
839 HydroNode::Map { input, .. }
840 | HydroNode::FlatMap { input, .. }
841 | HydroNode::Filter { input, .. }
842 | HydroNode::FilterMap { input, .. }
843 | HydroNode::Sort { input, .. }
844 | HydroNode::DeferTick { input, .. }
845 | HydroNode::Enumerate { input, .. }
846 | HydroNode::Inspect { input, .. }
847 | HydroNode::Unique { input, .. }
848 | HydroNode::Network { input, .. }
849 | HydroNode::Fold { input, .. }
850 | HydroNode::FoldKeyed { input, .. }
851 | HydroNode::Reduce { input, .. }
852 | HydroNode::ReduceKeyed { input, .. }
853 | HydroNode::Counter { input, .. } => {
854 transform(input.as_mut(), seen_tees);
855 }
856 }
857 }
858
859 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
860 match self {
861 HydroNode::Placeholder => HydroNode::Placeholder,
862 HydroNode::Source {
863 source,
864 location_kind,
865 metadata,
866 } => HydroNode::Source {
867 source: source.clone(),
868 location_kind: location_kind.clone(),
869 metadata: metadata.clone(),
870 },
871 HydroNode::CycleSource {
872 ident,
873 location_kind,
874 metadata,
875 } => HydroNode::CycleSource {
876 ident: ident.clone(),
877 location_kind: location_kind.clone(),
878 metadata: metadata.clone(),
879 },
880 HydroNode::Tee { inner, metadata } => {
881 if let Some(transformed) =
882 seen_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
883 {
884 HydroNode::Tee {
885 inner: TeeNode(transformed.clone()),
886 metadata: metadata.clone(),
887 }
888 } else {
889 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
890 seen_tees.insert(
891 inner.0.as_ref() as *const RefCell<HydroNode>,
892 new_rc.clone(),
893 );
894 let cloned = inner.0.borrow().deep_clone(seen_tees);
895 *new_rc.borrow_mut() = cloned;
896 HydroNode::Tee {
897 inner: TeeNode(new_rc),
898 metadata: metadata.clone(),
899 }
900 }
901 }
902 HydroNode::Persist { inner, metadata } => HydroNode::Persist {
903 inner: Box::new(inner.deep_clone(seen_tees)),
904 metadata: metadata.clone(),
905 },
906 HydroNode::Unpersist { inner, metadata } => HydroNode::Unpersist {
907 inner: Box::new(inner.deep_clone(seen_tees)),
908 metadata: metadata.clone(),
909 },
910 HydroNode::Delta { inner, metadata } => HydroNode::Delta {
911 inner: Box::new(inner.deep_clone(seen_tees)),
912 metadata: metadata.clone(),
913 },
914 HydroNode::Chain {
915 first,
916 second,
917 metadata,
918 } => HydroNode::Chain {
919 first: Box::new(first.deep_clone(seen_tees)),
920 second: Box::new(second.deep_clone(seen_tees)),
921 metadata: metadata.clone(),
922 },
923 HydroNode::CrossProduct {
924 left,
925 right,
926 metadata,
927 } => HydroNode::CrossProduct {
928 left: Box::new(left.deep_clone(seen_tees)),
929 right: Box::new(right.deep_clone(seen_tees)),
930 metadata: metadata.clone(),
931 },
932 HydroNode::CrossSingleton {
933 left,
934 right,
935 metadata,
936 } => HydroNode::CrossSingleton {
937 left: Box::new(left.deep_clone(seen_tees)),
938 right: Box::new(right.deep_clone(seen_tees)),
939 metadata: metadata.clone(),
940 },
941 HydroNode::Join {
942 left,
943 right,
944 metadata,
945 } => HydroNode::Join {
946 left: Box::new(left.deep_clone(seen_tees)),
947 right: Box::new(right.deep_clone(seen_tees)),
948 metadata: metadata.clone(),
949 },
950 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
951 pos: Box::new(pos.deep_clone(seen_tees)),
952 neg: Box::new(neg.deep_clone(seen_tees)),
953 metadata: metadata.clone(),
954 },
955 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
956 pos: Box::new(pos.deep_clone(seen_tees)),
957 neg: Box::new(neg.deep_clone(seen_tees)),
958 metadata: metadata.clone(),
959 },
960 HydroNode::Map { f, input, metadata } => HydroNode::Map {
961 f: f.clone(),
962 input: Box::new(input.deep_clone(seen_tees)),
963 metadata: metadata.clone(),
964 },
965 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
966 f: f.clone(),
967 input: Box::new(input.deep_clone(seen_tees)),
968 metadata: metadata.clone(),
969 },
970 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
971 f: f.clone(),
972 input: Box::new(input.deep_clone(seen_tees)),
973 metadata: metadata.clone(),
974 },
975 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
976 f: f.clone(),
977 input: Box::new(input.deep_clone(seen_tees)),
978 metadata: metadata.clone(),
979 },
980 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
981 input: Box::new(input.deep_clone(seen_tees)),
982 metadata: metadata.clone(),
983 },
984 HydroNode::Enumerate {
985 is_static,
986 input,
987 metadata,
988 } => HydroNode::Enumerate {
989 is_static: *is_static,
990 input: Box::new(input.deep_clone(seen_tees)),
991 metadata: metadata.clone(),
992 },
993 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
994 f: f.clone(),
995 input: Box::new(input.deep_clone(seen_tees)),
996 metadata: metadata.clone(),
997 },
998 HydroNode::Unique { input, metadata } => HydroNode::Unique {
999 input: Box::new(input.deep_clone(seen_tees)),
1000 metadata: metadata.clone(),
1001 },
1002 HydroNode::Sort { input, metadata } => HydroNode::Sort {
1003 input: Box::new(input.deep_clone(seen_tees)),
1004 metadata: metadata.clone(),
1005 },
1006 HydroNode::Fold {
1007 init,
1008 acc,
1009 input,
1010 metadata,
1011 } => HydroNode::Fold {
1012 init: init.clone(),
1013 acc: acc.clone(),
1014 input: Box::new(input.deep_clone(seen_tees)),
1015 metadata: metadata.clone(),
1016 },
1017 HydroNode::FoldKeyed {
1018 init,
1019 acc,
1020 input,
1021 metadata,
1022 } => HydroNode::FoldKeyed {
1023 init: init.clone(),
1024 acc: acc.clone(),
1025 input: Box::new(input.deep_clone(seen_tees)),
1026 metadata: metadata.clone(),
1027 },
1028 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
1029 f: f.clone(),
1030 input: Box::new(input.deep_clone(seen_tees)),
1031 metadata: metadata.clone(),
1032 },
1033 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
1034 f: f.clone(),
1035 input: Box::new(input.deep_clone(seen_tees)),
1036 metadata: metadata.clone(),
1037 },
1038 HydroNode::Network {
1039 from_key,
1040 to_location,
1041 to_key,
1042 serialize_fn,
1043 instantiate_fn,
1044 deserialize_fn,
1045 input,
1046 metadata,
1047 } => HydroNode::Network {
1048 from_key: *from_key,
1049 to_location: to_location.clone(),
1050 to_key: *to_key,
1051 serialize_fn: serialize_fn.clone(),
1052 instantiate_fn: instantiate_fn.clone(),
1053 deserialize_fn: deserialize_fn.clone(),
1054 input: Box::new(input.deep_clone(seen_tees)),
1055 metadata: metadata.clone(),
1056 },
1057 HydroNode::Counter {
1058 tag,
1059 duration,
1060 input,
1061 metadata,
1062 } => HydroNode::Counter {
1063 tag: tag.clone(),
1064 duration: duration.clone(),
1065 input: Box::new(input.deep_clone(seen_tees)),
1066 metadata: metadata.clone(),
1067 },
1068 }
1069 }
1070
1071 #[cfg(feature = "build")]
1072 pub fn emit_core(
1073 &mut self,
1074 builders_or_callback: &mut BuildersOrCallback<
1075 impl FnMut(&mut HydroLeaf, &mut usize),
1076 impl FnMut(&mut HydroNode, &mut usize),
1077 >,
1078 built_tees: &mut HashMap<*const RefCell<HydroNode>, (syn::Ident, usize)>,
1079 next_stmt_id: &mut usize,
1080 ) -> (syn::Ident, usize) {
1081 match self {
1082 HydroNode::Placeholder => {
1083 panic!()
1084 }
1085
1086 HydroNode::Persist { inner, .. } => {
1087 let (inner_ident, location) =
1088 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1089
1090 let persist_ident =
1091 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1092
1093 match builders_or_callback {
1094 BuildersOrCallback::Builders(graph_builders) => {
1095 let builder = graph_builders.entry(location).or_default();
1096 builder.add_dfir(
1097 parse_quote! {
1098 #persist_ident = #inner_ident -> persist::<'static>();
1099 },
1100 None,
1101 Some(&next_stmt_id.to_string()),
1102 );
1103 }
1104 BuildersOrCallback::Callback(_, node_callback) => {
1105 node_callback(self, next_stmt_id);
1106 }
1107 }
1108
1109 *next_stmt_id += 1;
1110
1111 (persist_ident, location)
1112 }
1113
1114 HydroNode::Unpersist { .. } => {
1115 panic!(
1116 "Unpersist is a marker node and should have been optimized away. This is likely a compiler bug."
1117 )
1118 }
1119
1120 HydroNode::Delta { inner, .. } => {
1121 let (inner_ident, location) =
1122 inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1123
1124 let delta_ident =
1125 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1126
1127 match builders_or_callback {
1128 BuildersOrCallback::Builders(graph_builders) => {
1129 let builder = graph_builders.entry(location).or_default();
1130 builder.add_dfir(
1131 parse_quote! {
1132 #delta_ident = #inner_ident -> multiset_delta();
1133 },
1134 None,
1135 Some(&next_stmt_id.to_string()),
1136 );
1137 }
1138 BuildersOrCallback::Callback(_, node_callback) => {
1139 node_callback(self, next_stmt_id);
1140 }
1141 }
1142
1143 *next_stmt_id += 1;
1144
1145 (delta_ident, location)
1146 }
1147
1148 HydroNode::Source {
1149 source,
1150 location_kind,
1151 ..
1152 } => {
1153 let location_id = match location_kind.clone() {
1154 LocationId::Process(id) => id,
1155 LocationId::Cluster(id) => id,
1156 LocationId::Tick(_, _) => panic!(),
1157 LocationId::ExternalProcess(id) => id,
1158 };
1159
1160 if let HydroSource::ExternalNetwork() = source {
1161 (syn::Ident::new("DUMMY", Span::call_site()), location_id)
1162 } else {
1163 let source_ident =
1164 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1165
1166 let source_stmt = match source {
1167 HydroSource::Stream(expr) => {
1168 parse_quote! {
1169 #source_ident = source_stream(#expr);
1170 }
1171 }
1172
1173 HydroSource::ExternalNetwork() => {
1174 unreachable!()
1175 }
1176
1177 HydroSource::Iter(expr) => {
1178 parse_quote! {
1179 #source_ident = source_iter(#expr);
1180 }
1181 }
1182
1183 HydroSource::Spin() => {
1184 parse_quote! {
1185 #source_ident = spin();
1186 }
1187 }
1188 };
1189
1190 match builders_or_callback {
1191 BuildersOrCallback::Builders(graph_builders) => {
1192 let builder = graph_builders.entry(location_id).or_default();
1193 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
1194 }
1195 BuildersOrCallback::Callback(_, node_callback) => {
1196 node_callback(self, next_stmt_id);
1197 }
1198 }
1199
1200 *next_stmt_id += 1;
1201
1202 (source_ident, location_id)
1203 }
1204 }
1205
1206 HydroNode::CycleSource {
1207 ident,
1208 location_kind,
1209 ..
1210 } => {
1211 let location_id = *match location_kind.root() {
1212 LocationId::Process(id) => id,
1213 LocationId::Cluster(id) => id,
1214 LocationId::Tick(_, _) => panic!(),
1215 LocationId::ExternalProcess(_) => panic!(),
1216 };
1217
1218 let ident = ident.clone();
1219
1220 match builders_or_callback {
1221 BuildersOrCallback::Builders(_) => {}
1222 BuildersOrCallback::Callback(_, node_callback) => {
1223 node_callback(self, next_stmt_id);
1224 }
1225 }
1226
1227 *next_stmt_id += 1;
1229
1230 (ident, location_id)
1231 }
1232
1233 HydroNode::Tee { inner, .. } => {
1234 let (ret_ident, inner_location_id) = if let Some((teed_from, inner_location_id)) =
1235 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
1236 {
1237 match builders_or_callback {
1238 BuildersOrCallback::Builders(_) => {}
1239 BuildersOrCallback::Callback(_, node_callback) => {
1240 node_callback(self, next_stmt_id);
1241 }
1242 }
1243
1244 (teed_from.clone(), *inner_location_id)
1245 } else {
1246 let (inner_ident, inner_location_id) = inner.0.borrow_mut().emit_core(
1247 builders_or_callback,
1248 built_tees,
1249 next_stmt_id,
1250 );
1251
1252 let tee_ident =
1253 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1254
1255 built_tees.insert(
1256 inner.0.as_ref() as *const RefCell<HydroNode>,
1257 (tee_ident.clone(), inner_location_id),
1258 );
1259
1260 match builders_or_callback {
1261 BuildersOrCallback::Builders(graph_builders) => {
1262 let builder = graph_builders.entry(inner_location_id).or_default();
1263 builder.add_dfir(
1264 parse_quote! {
1265 #tee_ident = #inner_ident -> tee();
1266 },
1267 None,
1268 Some(&next_stmt_id.to_string()),
1269 );
1270 }
1271 BuildersOrCallback::Callback(_, node_callback) => {
1272 node_callback(self, next_stmt_id);
1273 }
1274 }
1275
1276 (tee_ident, inner_location_id)
1277 };
1278
1279 *next_stmt_id += 1;
1283 (ret_ident, inner_location_id)
1284 }
1285
1286 HydroNode::Chain { first, second, .. } => {
1287 let (first_ident, first_location_id) =
1288 first.emit_core(builders_or_callback, built_tees, next_stmt_id);
1289 let (second_ident, second_location_id) =
1290 second.emit_core(builders_or_callback, built_tees, next_stmt_id);
1291
1292 assert_eq!(
1293 first_location_id, second_location_id,
1294 "chain inputs must be in the same location"
1295 );
1296
1297 let chain_ident =
1298 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1299
1300 match builders_or_callback {
1301 BuildersOrCallback::Builders(graph_builders) => {
1302 let builder = graph_builders.entry(first_location_id).or_default();
1303 builder.add_dfir(
1304 parse_quote! {
1305 #chain_ident = chain();
1306 #first_ident -> [0]#chain_ident;
1307 #second_ident -> [1]#chain_ident;
1308 },
1309 None,
1310 Some(&next_stmt_id.to_string()),
1311 );
1312 }
1313 BuildersOrCallback::Callback(_, node_callback) => {
1314 node_callback(self, next_stmt_id);
1315 }
1316 }
1317
1318 *next_stmt_id += 1;
1319
1320 (chain_ident, first_location_id)
1321 }
1322
1323 HydroNode::CrossSingleton { left, right, .. } => {
1324 let (left_ident, left_location_id) =
1325 left.emit_core(builders_or_callback, built_tees, next_stmt_id);
1326 let (right_ident, right_location_id) =
1327 right.emit_core(builders_or_callback, built_tees, next_stmt_id);
1328
1329 assert_eq!(
1330 left_location_id, right_location_id,
1331 "cross_singleton inputs must be in the same location"
1332 );
1333
1334 let cross_ident =
1335 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1336
1337 match builders_or_callback {
1338 BuildersOrCallback::Builders(graph_builders) => {
1339 let builder = graph_builders.entry(left_location_id).or_default();
1340 builder.add_dfir(
1341 parse_quote! {
1342 #cross_ident = cross_singleton();
1343 #left_ident -> [input]#cross_ident;
1344 #right_ident -> [single]#cross_ident;
1345 },
1346 None,
1347 Some(&next_stmt_id.to_string()),
1348 );
1349 }
1350 BuildersOrCallback::Callback(_, node_callback) => {
1351 node_callback(self, next_stmt_id);
1352 }
1353 }
1354
1355 *next_stmt_id += 1;
1356
1357 (cross_ident, left_location_id)
1358 }
1359
1360 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
1361 let operator: syn::Ident = if matches!(self, HydroNode::CrossProduct { .. }) {
1362 parse_quote!(cross_join_multiset)
1363 } else {
1364 parse_quote!(join_multiset)
1365 };
1366
1367 let (HydroNode::CrossProduct { left, right, .. }
1368 | HydroNode::Join { left, right, .. }) = self
1369 else {
1370 unreachable!()
1371 };
1372
1373 let (left_inner, left_lifetime) =
1374 if let HydroNode::Persist { inner: left, .. } = left.as_mut() {
1375 (left, quote!('static))
1376 } else {
1377 (left, quote!('tick))
1378 };
1379
1380 let (right_inner, right_lifetime) =
1381 if let HydroNode::Persist { inner: right, .. } = right.as_mut() {
1382 (right, quote!('static))
1383 } else {
1384 (right, quote!('tick))
1385 };
1386
1387 let (left_ident, left_location_id) =
1388 left_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1389 let (right_ident, right_location_id) =
1390 right_inner.emit_core(builders_or_callback, built_tees, next_stmt_id);
1391
1392 assert_eq!(
1393 left_location_id, right_location_id,
1394 "join / cross product inputs must be in the same location"
1395 );
1396
1397 let stream_ident =
1398 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1399
1400 match builders_or_callback {
1401 BuildersOrCallback::Builders(graph_builders) => {
1402 let builder = graph_builders.entry(left_location_id).or_default();
1403 builder.add_dfir(
1404 parse_quote! {
1405 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
1406 #left_ident -> [0]#stream_ident;
1407 #right_ident -> [1]#stream_ident;
1408 },
1409 None,
1410 Some(&next_stmt_id.to_string()),
1411 );
1412 }
1413 BuildersOrCallback::Callback(_, node_callback) => {
1414 node_callback(self, next_stmt_id);
1415 }
1416 }
1417
1418 *next_stmt_id += 1;
1419
1420 (stream_ident, left_location_id)
1421 }
1422
1423 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
1424 let operator: syn::Ident = if matches!(self, HydroNode::Difference { .. }) {
1425 parse_quote!(difference_multiset)
1426 } else {
1427 parse_quote!(anti_join_multiset)
1428 };
1429
1430 let (HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. }) =
1431 self
1432 else {
1433 unreachable!()
1434 };
1435
1436 let (neg, neg_lifetime) =
1437 if let HydroNode::Persist { inner: neg, .. } = neg.as_mut() {
1438 (neg, quote!('static))
1439 } else {
1440 (neg, quote!('tick))
1441 };
1442
1443 let (pos_ident, pos_location_id) =
1444 pos.emit_core(builders_or_callback, built_tees, next_stmt_id);
1445 let (neg_ident, neg_location_id) =
1446 neg.emit_core(builders_or_callback, built_tees, next_stmt_id);
1447
1448 assert_eq!(
1449 pos_location_id, neg_location_id,
1450 "difference / anti join inputs must be in the same location"
1451 );
1452
1453 let stream_ident =
1454 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1455
1456 match builders_or_callback {
1457 BuildersOrCallback::Builders(graph_builders) => {
1458 let builder = graph_builders.entry(pos_location_id).or_default();
1459 builder.add_dfir(
1460 parse_quote! {
1461 #stream_ident = #operator::<'tick, #neg_lifetime>();
1462 #pos_ident -> [pos]#stream_ident;
1463 #neg_ident -> [neg]#stream_ident;
1464 },
1465 None,
1466 Some(&next_stmt_id.to_string()),
1467 );
1468 }
1469 BuildersOrCallback::Callback(_, node_callback) => {
1470 node_callback(self, next_stmt_id);
1471 }
1472 }
1473
1474 *next_stmt_id += 1;
1475
1476 (stream_ident, pos_location_id)
1477 }
1478
1479 HydroNode::Map { f, input, .. } => {
1480 let (input_ident, input_location_id) =
1481 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1482
1483 let map_ident =
1484 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1485
1486 match builders_or_callback {
1487 BuildersOrCallback::Builders(graph_builders) => {
1488 let builder = graph_builders.entry(input_location_id).or_default();
1489 builder.add_dfir(
1490 parse_quote! {
1491 #map_ident = #input_ident -> map(#f);
1492 },
1493 None,
1494 Some(&next_stmt_id.to_string()),
1495 );
1496 }
1497 BuildersOrCallback::Callback(_, node_callback) => {
1498 node_callback(self, next_stmt_id);
1499 }
1500 }
1501
1502 *next_stmt_id += 1;
1503
1504 (map_ident, input_location_id)
1505 }
1506
1507 HydroNode::FlatMap { f, input, .. } => {
1508 let (input_ident, input_location_id) =
1509 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1510
1511 let flat_map_ident =
1512 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1513
1514 match builders_or_callback {
1515 BuildersOrCallback::Builders(graph_builders) => {
1516 let builder = graph_builders.entry(input_location_id).or_default();
1517 builder.add_dfir(
1518 parse_quote! {
1519 #flat_map_ident = #input_ident -> flat_map(#f);
1520 },
1521 None,
1522 Some(&next_stmt_id.to_string()),
1523 );
1524 }
1525 BuildersOrCallback::Callback(_, node_callback) => {
1526 node_callback(self, next_stmt_id);
1527 }
1528 }
1529
1530 *next_stmt_id += 1;
1531
1532 (flat_map_ident, input_location_id)
1533 }
1534
1535 HydroNode::Filter { f, input, .. } => {
1536 let (input_ident, input_location_id) =
1537 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1538
1539 let filter_ident =
1540 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1541
1542 match builders_or_callback {
1543 BuildersOrCallback::Builders(graph_builders) => {
1544 let builder = graph_builders.entry(input_location_id).or_default();
1545 builder.add_dfir(
1546 parse_quote! {
1547 #filter_ident = #input_ident -> filter(#f);
1548 },
1549 None,
1550 Some(&next_stmt_id.to_string()),
1551 );
1552 }
1553 BuildersOrCallback::Callback(_, node_callback) => {
1554 node_callback(self, next_stmt_id);
1555 }
1556 }
1557
1558 *next_stmt_id += 1;
1559
1560 (filter_ident, input_location_id)
1561 }
1562
1563 HydroNode::FilterMap { f, input, .. } => {
1564 let (input_ident, input_location_id) =
1565 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1566
1567 let filter_map_ident =
1568 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1569
1570 match builders_or_callback {
1571 BuildersOrCallback::Builders(graph_builders) => {
1572 let builder = graph_builders.entry(input_location_id).or_default();
1573 builder.add_dfir(
1574 parse_quote! {
1575 #filter_map_ident = #input_ident -> filter_map(#f);
1576 },
1577 None,
1578 Some(&next_stmt_id.to_string()),
1579 );
1580 }
1581 BuildersOrCallback::Callback(_, node_callback) => {
1582 node_callback(self, next_stmt_id);
1583 }
1584 }
1585
1586 *next_stmt_id += 1;
1587
1588 (filter_map_ident, input_location_id)
1589 }
1590
1591 HydroNode::Sort { input, .. } => {
1592 let (input_ident, input_location_id) =
1593 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1594
1595 let sort_ident =
1596 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1597
1598 match builders_or_callback {
1599 BuildersOrCallback::Builders(graph_builders) => {
1600 let builder = graph_builders.entry(input_location_id).or_default();
1601 builder.add_dfir(
1602 parse_quote! {
1603 #sort_ident = #input_ident -> sort();
1604 },
1605 None,
1606 Some(&next_stmt_id.to_string()),
1607 );
1608 }
1609 BuildersOrCallback::Callback(_, node_callback) => {
1610 node_callback(self, next_stmt_id);
1611 }
1612 }
1613
1614 *next_stmt_id += 1;
1615
1616 (sort_ident, input_location_id)
1617 }
1618
1619 HydroNode::DeferTick { input, .. } => {
1620 let (input_ident, input_location_id) =
1621 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1622
1623 let defer_tick_ident =
1624 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1625
1626 match builders_or_callback {
1627 BuildersOrCallback::Builders(graph_builders) => {
1628 let builder = graph_builders.entry(input_location_id).or_default();
1629 builder.add_dfir(
1630 parse_quote! {
1631 #defer_tick_ident = #input_ident -> defer_tick_lazy();
1632 },
1633 None,
1634 Some(&next_stmt_id.to_string()),
1635 );
1636 }
1637 BuildersOrCallback::Callback(_, node_callback) => {
1638 node_callback(self, next_stmt_id);
1639 }
1640 }
1641
1642 *next_stmt_id += 1;
1643
1644 (defer_tick_ident, input_location_id)
1645 }
1646
1647 HydroNode::Enumerate {
1648 is_static, input, ..
1649 } => {
1650 let (input_ident, input_location_id) =
1651 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1652
1653 let enumerate_ident =
1654 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1655
1656 match builders_or_callback {
1657 BuildersOrCallback::Builders(graph_builders) => {
1658 let builder = graph_builders.entry(input_location_id).or_default();
1659 let lifetime = if *is_static {
1660 quote!('static)
1661 } else {
1662 quote!('tick)
1663 };
1664 builder.add_dfir(
1665 parse_quote! {
1666 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
1667 },
1668 None,
1669 Some(&next_stmt_id.to_string()),
1670 );
1671 }
1672 BuildersOrCallback::Callback(_, node_callback) => {
1673 node_callback(self, next_stmt_id);
1674 }
1675 }
1676
1677 *next_stmt_id += 1;
1678
1679 (enumerate_ident, input_location_id)
1680 }
1681
1682 HydroNode::Inspect { f, input, .. } => {
1683 let (input_ident, input_location_id) =
1684 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1685
1686 let inspect_ident =
1687 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1688
1689 match builders_or_callback {
1690 BuildersOrCallback::Builders(graph_builders) => {
1691 let builder = graph_builders.entry(input_location_id).or_default();
1692 builder.add_dfir(
1693 parse_quote! {
1694 #inspect_ident = #input_ident -> inspect(#f);
1695 },
1696 None,
1697 Some(&next_stmt_id.to_string()),
1698 );
1699 }
1700 BuildersOrCallback::Callback(_, node_callback) => {
1701 node_callback(self, next_stmt_id);
1702 }
1703 }
1704
1705 *next_stmt_id += 1;
1706
1707 (inspect_ident, input_location_id)
1708 }
1709
1710 HydroNode::Unique { input, .. } => {
1711 let (input_ident, input_location_id) =
1712 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1713
1714 let unique_ident =
1715 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1716
1717 match builders_or_callback {
1718 BuildersOrCallback::Builders(graph_builders) => {
1719 let builder = graph_builders.entry(input_location_id).or_default();
1720 builder.add_dfir(
1721 parse_quote! {
1722 #unique_ident = #input_ident -> unique::<'tick>();
1723 },
1724 None,
1725 Some(&next_stmt_id.to_string()),
1726 );
1727 }
1728 BuildersOrCallback::Callback(_, node_callback) => {
1729 node_callback(self, next_stmt_id);
1730 }
1731 }
1732
1733 *next_stmt_id += 1;
1734
1735 (unique_ident, input_location_id)
1736 }
1737
1738 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } => {
1739 let operator: syn::Ident = if matches!(self, HydroNode::Fold { .. }) {
1740 parse_quote!(fold)
1741 } else {
1742 parse_quote!(fold_keyed)
1743 };
1744
1745 let (HydroNode::Fold {
1746 init, acc, input, ..
1747 }
1748 | HydroNode::FoldKeyed {
1749 init, acc, input, ..
1750 }) = self
1751 else {
1752 unreachable!()
1753 };
1754
1755 let (input, lifetime) =
1756 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1757 (input, quote!('static))
1758 } else {
1759 (input, quote!('tick))
1760 };
1761
1762 let (input_ident, input_location_id) =
1763 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1764
1765 let fold_ident =
1766 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1767
1768 match builders_or_callback {
1769 BuildersOrCallback::Builders(graph_builders) => {
1770 let builder = graph_builders.entry(input_location_id).or_default();
1771 builder.add_dfir(
1772 parse_quote! {
1773 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
1774 },
1775 None,
1776 Some(&next_stmt_id.to_string()),
1777 );
1778 }
1779 BuildersOrCallback::Callback(_, node_callback) => {
1780 node_callback(self, next_stmt_id);
1781 }
1782 }
1783
1784 *next_stmt_id += 1;
1785
1786 (fold_ident, input_location_id)
1787 }
1788
1789 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
1790 let operator: syn::Ident = if matches!(self, HydroNode::Reduce { .. }) {
1791 parse_quote!(reduce)
1792 } else {
1793 parse_quote!(reduce_keyed)
1794 };
1795
1796 let (HydroNode::Reduce { f, input, .. } | HydroNode::ReduceKeyed { f, input, .. }) =
1797 self
1798 else {
1799 unreachable!()
1800 };
1801
1802 let (input, lifetime) =
1803 if let HydroNode::Persist { inner: input, .. } = input.as_mut() {
1804 (input, quote!('static))
1805 } else {
1806 (input, quote!('tick))
1807 };
1808
1809 let (input_ident, input_location_id) =
1810 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1811
1812 let reduce_ident =
1813 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1814
1815 match builders_or_callback {
1816 BuildersOrCallback::Builders(graph_builders) => {
1817 let builder = graph_builders.entry(input_location_id).or_default();
1818 builder.add_dfir(
1819 parse_quote! {
1820 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
1821 },
1822 None,
1823 Some(&next_stmt_id.to_string()),
1824 );
1825 }
1826 BuildersOrCallback::Callback(_, node_callback) => {
1827 node_callback(self, next_stmt_id);
1828 }
1829 }
1830
1831 *next_stmt_id += 1;
1832
1833 (reduce_ident, input_location_id)
1834 }
1835
1836 HydroNode::Network {
1837 from_key: _,
1838 to_location,
1839 to_key: _,
1840 serialize_fn: serialize_pipeline,
1841 instantiate_fn,
1842 deserialize_fn: deserialize_pipeline,
1843 input,
1844 ..
1845 } => {
1846 let (input_ident, input_location_id) =
1847 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1848
1849 let to_id = match *to_location {
1850 LocationId::Process(id) => id,
1851 LocationId::Cluster(id) => id,
1852 LocationId::Tick(_, _) => panic!(),
1853 LocationId::ExternalProcess(id) => id,
1854 };
1855
1856 let receiver_stream_ident =
1857 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1858
1859 match builders_or_callback {
1860 BuildersOrCallback::Builders(graph_builders) => {
1861 let (sink_expr, source_expr) = match instantiate_fn {
1862 DebugInstantiate::Building => (
1863 syn::parse_quote!(DUMMY_SINK),
1864 syn::parse_quote!(DUMMY_SOURCE),
1865 ),
1866
1867 DebugInstantiate::Finalized(sink, source, _connect_fn) => {
1868 (sink.clone(), source.clone())
1869 }
1870 };
1871
1872 let sender_builder = graph_builders.entry(input_location_id).or_default();
1873 if let Some(serialize_pipeline) = serialize_pipeline {
1874 sender_builder.add_dfir(
1875 parse_quote! {
1876 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink_expr);
1877 },
1878 None,
1879 Some(&next_stmt_id.to_string()),
1880 );
1881 } else {
1882 sender_builder.add_dfir(
1883 parse_quote! {
1884 #input_ident -> dest_sink(#sink_expr);
1885 },
1886 None,
1887 Some(&next_stmt_id.to_string()),
1888 );
1889 }
1890
1891 let receiver_builder = graph_builders.entry(to_id).or_default();
1892 if let Some(deserialize_pipeline) = deserialize_pipeline {
1893 receiver_builder.add_dfir(parse_quote! {
1894 #receiver_stream_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
1895 }, None, Some(&next_stmt_id.to_string()));
1896 } else {
1897 receiver_builder.add_dfir(
1898 parse_quote! {
1899 #receiver_stream_ident = source_stream(#source_expr);
1900 },
1901 None,
1902 Some(&next_stmt_id.to_string()),
1903 );
1904 }
1905 }
1906 BuildersOrCallback::Callback(_, node_callback) => {
1907 node_callback(self, next_stmt_id);
1908 }
1909 }
1910
1911 *next_stmt_id += 1;
1912
1913 (receiver_stream_ident, to_id)
1914 }
1915
1916 HydroNode::Counter {
1917 tag,
1918 duration,
1919 input,
1920 ..
1921 } => {
1922 let (input_ident, input_location_id) =
1923 input.emit_core(builders_or_callback, built_tees, next_stmt_id);
1924
1925 let counter_ident =
1926 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
1927
1928 match builders_or_callback {
1929 BuildersOrCallback::Builders(graph_builders) => {
1930 let builder = graph_builders.entry(input_location_id).or_default();
1931 builder.add_dfir(
1932 parse_quote! {
1933 #counter_ident = #input_ident -> _counter(#tag, #duration);
1934 },
1935 None,
1936 Some(&next_stmt_id.to_string()),
1937 );
1938 }
1939 BuildersOrCallback::Callback(_, node_callback) => {
1940 node_callback(self, next_stmt_id);
1941 }
1942 }
1943
1944 *next_stmt_id += 1;
1945
1946 (counter_ident, input_location_id)
1947 }
1948 }
1949 }
1950
1951 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1952 match self {
1953 HydroNode::Placeholder => {
1954 panic!()
1955 }
1956 HydroNode::Source { source, .. } => match source {
1957 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
1958 HydroSource::ExternalNetwork() | HydroSource::Spin() => {}
1959 },
1960 HydroNode::CycleSource { .. }
1961 | HydroNode::Tee { .. }
1962 | HydroNode::Persist { .. }
1963 | HydroNode::Unpersist { .. }
1964 | HydroNode::Delta { .. }
1965 | HydroNode::Chain { .. }
1966 | HydroNode::CrossProduct { .. }
1967 | HydroNode::CrossSingleton { .. }
1968 | HydroNode::Join { .. }
1969 | HydroNode::Difference { .. }
1970 | HydroNode::AntiJoin { .. }
1971 | HydroNode::DeferTick { .. }
1972 | HydroNode::Enumerate { .. }
1973 | HydroNode::Unique { .. }
1974 | HydroNode::Sort { .. } => {}
1975 HydroNode::Map { f, .. }
1976 | HydroNode::FlatMap { f, .. }
1977 | HydroNode::Filter { f, .. }
1978 | HydroNode::FilterMap { f, .. }
1979 | HydroNode::Inspect { f, .. }
1980 | HydroNode::Reduce { f, .. }
1981 | HydroNode::ReduceKeyed { f, .. } => {
1982 transform(f);
1983 }
1984 HydroNode::Fold { init, acc, .. } | HydroNode::FoldKeyed { init, acc, .. } => {
1985 transform(init);
1986 transform(acc);
1987 }
1988 HydroNode::Network {
1989 serialize_fn,
1990 deserialize_fn,
1991 ..
1992 } => {
1993 if let Some(serialize_fn) = serialize_fn {
1994 transform(serialize_fn);
1995 }
1996 if let Some(deserialize_fn) = deserialize_fn {
1997 transform(deserialize_fn);
1998 }
1999 }
2000 HydroNode::Counter { duration, .. } => {
2001 transform(duration);
2002 }
2003 }
2004 }
2005
2006 pub fn metadata(&self) -> &HydroIrMetadata {
2007 match self {
2008 HydroNode::Placeholder => {
2009 panic!()
2010 }
2011 HydroNode::Source { metadata, .. } => metadata,
2012 HydroNode::CycleSource { metadata, .. } => metadata,
2013 HydroNode::Tee { metadata, .. } => metadata,
2014 HydroNode::Persist { metadata, .. } => metadata,
2015 HydroNode::Unpersist { metadata, .. } => metadata,
2016 HydroNode::Delta { metadata, .. } => metadata,
2017 HydroNode::Chain { metadata, .. } => metadata,
2018 HydroNode::CrossProduct { metadata, .. } => metadata,
2019 HydroNode::CrossSingleton { metadata, .. } => metadata,
2020 HydroNode::Join { metadata, .. } => metadata,
2021 HydroNode::Difference { metadata, .. } => metadata,
2022 HydroNode::AntiJoin { metadata, .. } => metadata,
2023 HydroNode::Map { metadata, .. } => metadata,
2024 HydroNode::FlatMap { metadata, .. } => metadata,
2025 HydroNode::Filter { metadata, .. } => metadata,
2026 HydroNode::FilterMap { metadata, .. } => metadata,
2027 HydroNode::DeferTick { metadata, .. } => metadata,
2028 HydroNode::Enumerate { metadata, .. } => metadata,
2029 HydroNode::Inspect { metadata, .. } => metadata,
2030 HydroNode::Unique { metadata, .. } => metadata,
2031 HydroNode::Sort { metadata, .. } => metadata,
2032 HydroNode::Fold { metadata, .. } => metadata,
2033 HydroNode::FoldKeyed { metadata, .. } => metadata,
2034 HydroNode::Reduce { metadata, .. } => metadata,
2035 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2036 HydroNode::Network { metadata, .. } => metadata,
2037 HydroNode::Counter { metadata, .. } => metadata,
2038 }
2039 }
2040
2041 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
2042 match self {
2043 HydroNode::Placeholder => {
2044 panic!()
2045 }
2046 HydroNode::Source { metadata, .. } => metadata,
2047 HydroNode::CycleSource { metadata, .. } => metadata,
2048 HydroNode::Tee { metadata, .. } => metadata,
2049 HydroNode::Persist { metadata, .. } => metadata,
2050 HydroNode::Unpersist { metadata, .. } => metadata,
2051 HydroNode::Delta { metadata, .. } => metadata,
2052 HydroNode::Chain { metadata, .. } => metadata,
2053 HydroNode::CrossProduct { metadata, .. } => metadata,
2054 HydroNode::CrossSingleton { metadata, .. } => metadata,
2055 HydroNode::Join { metadata, .. } => metadata,
2056 HydroNode::Difference { metadata, .. } => metadata,
2057 HydroNode::AntiJoin { metadata, .. } => metadata,
2058 HydroNode::Map { metadata, .. } => metadata,
2059 HydroNode::FlatMap { metadata, .. } => metadata,
2060 HydroNode::Filter { metadata, .. } => metadata,
2061 HydroNode::FilterMap { metadata, .. } => metadata,
2062 HydroNode::DeferTick { metadata, .. } => metadata,
2063 HydroNode::Enumerate { metadata, .. } => metadata,
2064 HydroNode::Inspect { metadata, .. } => metadata,
2065 HydroNode::Unique { metadata, .. } => metadata,
2066 HydroNode::Sort { metadata, .. } => metadata,
2067 HydroNode::Fold { metadata, .. } => metadata,
2068 HydroNode::FoldKeyed { metadata, .. } => metadata,
2069 HydroNode::Reduce { metadata, .. } => metadata,
2070 HydroNode::ReduceKeyed { metadata, .. } => metadata,
2071 HydroNode::Network { metadata, .. } => metadata,
2072 HydroNode::Counter { metadata, .. } => metadata,
2073 }
2074 }
2075
2076 pub fn print_root(&self) -> String {
2077 match self {
2078 HydroNode::Placeholder => {
2079 panic!()
2080 }
2081 HydroNode::Source { source, .. } => format!("Source({:?})", source),
2082 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
2083 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
2084 HydroNode::Persist { .. } => "Persist()".to_string(),
2085 HydroNode::Unpersist { .. } => "Unpersist()".to_string(),
2086 HydroNode::Delta { .. } => "Delta()".to_string(),
2087 HydroNode::Chain { first, second, .. } => {
2088 format!("Chain({}, {})", first.print_root(), second.print_root())
2089 }
2090 HydroNode::CrossProduct { left, right, .. } => {
2091 format!(
2092 "CrossProduct({}, {})",
2093 left.print_root(),
2094 right.print_root()
2095 )
2096 }
2097 HydroNode::CrossSingleton { left, right, .. } => {
2098 format!(
2099 "CrossSingleton({}, {})",
2100 left.print_root(),
2101 right.print_root()
2102 )
2103 }
2104 HydroNode::Join { left, right, .. } => {
2105 format!("Join({}, {})", left.print_root(), right.print_root())
2106 }
2107 HydroNode::Difference { pos, neg, .. } => {
2108 format!("Difference({}, {})", pos.print_root(), neg.print_root())
2109 }
2110 HydroNode::AntiJoin { pos, neg, .. } => {
2111 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
2112 }
2113 HydroNode::Map { f, .. } => format!("Map({:?})", f),
2114 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
2115 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
2116 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
2117 HydroNode::DeferTick { .. } => "DeferTick()".to_string(),
2118 HydroNode::Enumerate { is_static, .. } => format!("Enumerate({:?})", is_static),
2119 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
2120 HydroNode::Unique { .. } => "Unique()".to_string(),
2121 HydroNode::Sort { .. } => "Sort()".to_string(),
2122 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
2123 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
2124 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
2125 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
2126 HydroNode::Network { to_location, .. } => format!("Network(to {:?})", to_location),
2127 HydroNode::Counter { tag, duration, .. } => {
2128 format!("Counter({:?}, {:?})", tag, duration)
2129 }
2130 }
2131 }
2132}
2133
2134#[cfg(feature = "build")]
2135#[expect(clippy::too_many_arguments, reason = "networking internals")]
2136fn instantiate_network<'a, D: Deploy<'a>>(
2137 from_location: &LocationId,
2138 from_key: Option<usize>,
2139 to_location: &LocationId,
2140 to_key: Option<usize>,
2141 nodes: &HashMap<usize, D::Process>,
2142 clusters: &HashMap<usize, D::Cluster>,
2143 externals: &HashMap<usize, D::ExternalProcess>,
2144 compile_env: &D::CompileEnv,
2145) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>) {
2146 let ((sink, source), connect_fn) = match (from_location, to_location) {
2147 (LocationId::Process(from), LocationId::Process(to)) => {
2148 let from_node = nodes
2149 .get(from)
2150 .unwrap_or_else(|| {
2151 panic!("A process used in the graph was not instantiated: {}", from)
2152 })
2153 .clone();
2154 let to_node = nodes
2155 .get(to)
2156 .unwrap_or_else(|| {
2157 panic!("A process used in the graph was not instantiated: {}", to)
2158 })
2159 .clone();
2160
2161 let sink_port = D::allocate_process_port(&from_node);
2162 let source_port = D::allocate_process_port(&to_node);
2163
2164 (
2165 D::o2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2166 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
2167 )
2168 }
2169 (LocationId::Process(from), LocationId::Cluster(to)) => {
2170 let from_node = nodes
2171 .get(from)
2172 .unwrap_or_else(|| {
2173 panic!("A process used in the graph was not instantiated: {}", from)
2174 })
2175 .clone();
2176 let to_node = clusters
2177 .get(to)
2178 .unwrap_or_else(|| {
2179 panic!("A cluster used in the graph was not instantiated: {}", to)
2180 })
2181 .clone();
2182
2183 let sink_port = D::allocate_process_port(&from_node);
2184 let source_port = D::allocate_cluster_port(&to_node);
2185
2186 (
2187 D::o2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2188 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
2189 )
2190 }
2191 (LocationId::Cluster(from), LocationId::Process(to)) => {
2192 let from_node = clusters
2193 .get(from)
2194 .unwrap_or_else(|| {
2195 panic!("A cluster used in the graph was not instantiated: {}", from)
2196 })
2197 .clone();
2198 let to_node = nodes
2199 .get(to)
2200 .unwrap_or_else(|| {
2201 panic!("A process used in the graph was not instantiated: {}", to)
2202 })
2203 .clone();
2204
2205 let sink_port = D::allocate_cluster_port(&from_node);
2206 let source_port = D::allocate_process_port(&to_node);
2207
2208 (
2209 D::m2o_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2210 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
2211 )
2212 }
2213 (LocationId::Cluster(from), LocationId::Cluster(to)) => {
2214 let from_node = clusters
2215 .get(from)
2216 .unwrap_or_else(|| {
2217 panic!("A cluster used in the graph was not instantiated: {}", from)
2218 })
2219 .clone();
2220 let to_node = clusters
2221 .get(to)
2222 .unwrap_or_else(|| {
2223 panic!("A cluster used in the graph was not instantiated: {}", to)
2224 })
2225 .clone();
2226
2227 let sink_port = D::allocate_cluster_port(&from_node);
2228 let source_port = D::allocate_cluster_port(&to_node);
2229
2230 (
2231 D::m2m_sink_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2232 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
2233 )
2234 }
2235 (LocationId::ExternalProcess(from), LocationId::Process(to)) => {
2236 let from_node = externals
2237 .get(from)
2238 .unwrap_or_else(|| {
2239 panic!(
2240 "A external used in the graph was not instantiated: {}",
2241 from
2242 )
2243 })
2244 .clone();
2245
2246 let to_node = nodes
2247 .get(to)
2248 .unwrap_or_else(|| {
2249 panic!("A process used in the graph was not instantiated: {}", to)
2250 })
2251 .clone();
2252
2253 let sink_port = D::allocate_external_port(&from_node);
2254 let source_port = D::allocate_process_port(&to_node);
2255
2256 from_node.register(from_key.unwrap(), sink_port.clone());
2257
2258 (
2259 (
2260 parse_quote!(DUMMY),
2261 D::e2o_source(compile_env, &from_node, &sink_port, &to_node, &source_port),
2262 ),
2263 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port),
2264 )
2265 }
2266 (LocationId::ExternalProcess(_from), LocationId::Cluster(_to)) => {
2267 todo!("NYI")
2268 }
2269 (LocationId::ExternalProcess(_), LocationId::ExternalProcess(_)) => {
2270 panic!("Cannot send from external to external")
2271 }
2272 (LocationId::Process(from), LocationId::ExternalProcess(to)) => {
2273 let from_node = nodes
2274 .get(from)
2275 .unwrap_or_else(|| {
2276 panic!("A process used in the graph was not instantiated: {}", from)
2277 })
2278 .clone();
2279
2280 let to_node = externals
2281 .get(to)
2282 .unwrap_or_else(|| {
2283 panic!("A external used in the graph was not instantiated: {}", to)
2284 })
2285 .clone();
2286
2287 let sink_port = D::allocate_process_port(&from_node);
2288 let source_port = D::allocate_external_port(&to_node);
2289
2290 to_node.register(to_key.unwrap(), source_port.clone());
2291
2292 (
2293 (
2294 D::o2e_sink(compile_env, &from_node, &sink_port, &to_node, &source_port),
2295 parse_quote!(DUMMY),
2296 ),
2297 D::o2e_connect(&from_node, &sink_port, &to_node, &source_port),
2298 )
2299 }
2300 (LocationId::Cluster(_from), LocationId::ExternalProcess(_to)) => {
2301 todo!("NYI")
2302 }
2303 (LocationId::Tick(_, _), _) => panic!(),
2304 (_, LocationId::Tick(_, _)) => panic!(),
2305 };
2306 (sink, source, connect_fn)
2307}