1use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11use tokio::time::Instant;
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_singleton::KeyedSingleton;
15use super::keyed_stream::{Generate, KeyedStream};
16use super::optional::Optional;
17use super::singleton::Singleton;
18use crate::compile::builder::{CycleId, FlowState};
19use crate::compile::ir::{
20 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
21};
22#[cfg(stageleft_runtime)]
23use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
24use crate::forward_handle::{ForwardRef, TickCycle};
25use crate::live_collections::batch_atomic::BatchAtomic;
26use crate::live_collections::singleton::SingletonBound;
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::{Atomic, DeferTick};
30use crate::location::{Location, Tick, TopLevel, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::prelude::manual_proof;
34use crate::properties::{
35 AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
36 ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
37 ValidMutCommutativityFor, ValidMutIdempotenceFor,
38};
39
40pub mod networking;
41
42#[sealed::sealed]
44pub trait Ordering:
45 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
46{
47 const ORDERING_KIND: StreamOrder;
49}
50
51pub enum TotalOrder {}
55
56#[sealed::sealed]
57impl Ordering for TotalOrder {
58 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
59}
60
61pub enum NoOrder {}
67
68#[sealed::sealed]
69impl Ordering for NoOrder {
70 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
71}
72
73#[sealed::sealed]
77pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
78#[sealed::sealed]
79impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
80
81#[sealed::sealed]
83pub trait MinOrder<Other: ?Sized> {
84 type Min: Ordering;
86}
87
88#[sealed::sealed]
89impl<O: Ordering> MinOrder<O> for TotalOrder {
90 type Min = O;
91}
92
93#[sealed::sealed]
94impl<O: Ordering> MinOrder<O> for NoOrder {
95 type Min = NoOrder;
96}
97
98#[sealed::sealed]
100pub trait Retries:
101 MinRetries<Self, Min = Self>
102 + MinRetries<ExactlyOnce, Min = Self>
103 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
104{
105 const RETRIES_KIND: StreamRetry;
107}
108
109pub enum ExactlyOnce {}
112
113#[sealed::sealed]
114impl Retries for ExactlyOnce {
115 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
116}
117
118pub enum AtLeastOnce {}
121
122#[sealed::sealed]
123impl Retries for AtLeastOnce {
124 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
125}
126
127#[sealed::sealed]
131pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
132#[sealed::sealed]
133impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
134
135#[sealed::sealed]
137pub trait MinRetries<Other: ?Sized> {
138 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
140}
141
142#[sealed::sealed]
143impl<R: Retries> MinRetries<R> for ExactlyOnce {
144 type Min = R;
145}
146
147#[sealed::sealed]
148impl<R: Retries> MinRetries<R> for AtLeastOnce {
149 type Min = AtLeastOnce;
150}
151
152#[sealed::sealed]
153#[diagnostic::on_unimplemented(
154 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
155 label = "required here",
156 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
157)]
158pub trait IsOrdered: Ordering {}
160
161#[sealed::sealed]
162#[diagnostic::do_not_recommend]
163impl IsOrdered for TotalOrder {}
164
165#[sealed::sealed]
166#[diagnostic::on_unimplemented(
167 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
168 label = "required here",
169 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
170)]
171pub trait IsExactlyOnce: Retries {}
173
174#[sealed::sealed]
175#[diagnostic::do_not_recommend]
176impl IsExactlyOnce for ExactlyOnce {}
177
178pub struct Stream<
198 Type,
199 Loc,
200 Bound: Boundedness = Unbounded,
201 Order: Ordering = TotalOrder,
202 Retry: Retries = ExactlyOnce,
203> {
204 pub(crate) location: Loc,
205 pub(crate) ir_node: RefCell<HydroNode>,
206 pub(crate) flow_state: FlowState,
207
208 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
209}
210
211impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
212 fn drop(&mut self) {
213 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
214 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
215 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
216 input: Box::new(ir_node),
217 op_metadata: HydroIrOpMetadata::new(),
218 });
219 }
220 }
221}
222
223impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
224 for Stream<T, L, Unbounded, O, R>
225where
226 L: Location<'a>,
227{
228 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
229 let new_meta = stream
230 .location
231 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
232
233 Stream {
234 location: stream.location.clone(),
235 flow_state: stream.flow_state.clone(),
236 ir_node: RefCell::new(HydroNode::Cast {
237 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
238 metadata: new_meta,
239 }),
240 _phantom: PhantomData,
241 }
242 }
243}
244
245impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
246 for Stream<T, L, B, NoOrder, R>
247where
248 L: Location<'a>,
249{
250 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
251 stream.weaken_ordering()
252 }
253}
254
255impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
256 for Stream<T, L, B, O, AtLeastOnce>
257where
258 L: Location<'a>,
259{
260 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
261 stream.weaken_retries()
262 }
263}
264
265impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
266where
267 L: Location<'a>,
268{
269 fn defer_tick(self) -> Self {
270 Stream::defer_tick(self)
271 }
272}
273
274impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
275 for Stream<T, Tick<L>, Bounded, O, R>
276where
277 L: Location<'a>,
278{
279 type Location = Tick<L>;
280
281 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
282 Stream::new(
283 location.clone(),
284 HydroNode::CycleSource {
285 cycle_id,
286 metadata: location.new_node_metadata(Self::collection_kind()),
287 },
288 )
289 }
290}
291
292impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
293 for Stream<T, Tick<L>, Bounded, O, R>
294where
295 L: Location<'a>,
296{
297 type Location = Tick<L>;
298
299 fn location(&self) -> &Self::Location {
300 self.location()
301 }
302
303 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
304 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
305 location.clone(),
306 HydroNode::DeferTick {
307 input: Box::new(HydroNode::CycleSource {
308 cycle_id,
309 metadata: location.new_node_metadata(Self::collection_kind()),
310 }),
311 metadata: location.new_node_metadata(Self::collection_kind()),
312 },
313 );
314
315 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
316 }
317}
318
319impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
320 for Stream<T, Tick<L>, Bounded, O, R>
321where
322 L: Location<'a>,
323{
324 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
325 assert_eq!(
326 Location::id(&self.location),
327 expected_location,
328 "locations do not match"
329 );
330 self.location
331 .flow_state()
332 .borrow_mut()
333 .push_root(HydroRoot::CycleSink {
334 cycle_id,
335 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
336 op_metadata: HydroIrOpMetadata::new(),
337 });
338 }
339}
340
341impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
342 for Stream<T, L, B, O, R>
343where
344 L: Location<'a>,
345{
346 type Location = L;
347
348 fn create_source(cycle_id: CycleId, location: L) -> Self {
349 Stream::new(
350 location.clone(),
351 HydroNode::CycleSource {
352 cycle_id,
353 metadata: location.new_node_metadata(Self::collection_kind()),
354 },
355 )
356 }
357}
358
359impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
360 for Stream<T, L, B, O, R>
361where
362 L: Location<'a>,
363{
364 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
365 assert_eq!(
366 Location::id(&self.location),
367 expected_location,
368 "locations do not match"
369 );
370 self.location
371 .flow_state()
372 .borrow_mut()
373 .push_root(HydroRoot::CycleSink {
374 cycle_id,
375 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
376 op_metadata: HydroIrOpMetadata::new(),
377 });
378 }
379}
380
381impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
382where
383 T: Clone,
384 L: Location<'a>,
385{
386 fn clone(&self) -> Self {
387 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
388 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
389 *self.ir_node.borrow_mut() = HydroNode::Tee {
390 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
391 metadata: self.location.new_node_metadata(Self::collection_kind()),
392 };
393 }
394
395 let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
396 unreachable!()
397 };
398 Stream {
399 location: self.location.clone(),
400 flow_state: self.flow_state.clone(),
401 ir_node: HydroNode::Tee {
402 inner: SharedNode(inner.0.clone()),
403 metadata: metadata.clone(),
404 }
405 .into(),
406 _phantom: PhantomData,
407 }
408 }
409}
410
411impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
412where
413 L: Location<'a>,
414{
415 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
416 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
417 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
418
419 let flow_state = location.flow_state().clone();
420 Stream {
421 location,
422 flow_state,
423 ir_node: RefCell::new(ir_node),
424 _phantom: PhantomData,
425 }
426 }
427
428 pub fn location(&self) -> &L {
430 &self.location
431 }
432
433 pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
438 where
439 B: IsBounded,
440 {
441 crate::handoff_ref::StreamRef::new(&self.ir_node)
442 }
443
444 pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
447 where
448 B: IsBounded,
449 {
450 crate::handoff_ref::StreamMut::new(&self.ir_node)
451 }
452
453 pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
456 where
457 L: Location<'a>,
458 {
459 if L::consistency()
460 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
461 {
462 Stream::new(
464 self.location.drop_consistency(),
465 self.ir_node.replace(HydroNode::Placeholder),
466 )
467 } else {
468 Stream::new(
469 self.location.drop_consistency(),
470 HydroNode::Cast {
471 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
472 metadata: self.location.drop_consistency().new_node_metadata(Stream::<
473 T,
474 L::DropConsistency,
475 B,
476 O,
477 R,
478 >::collection_kind(
479 )),
480 },
481 )
482 }
483 }
484
485 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
489 self,
490 _proof: impl crate::properties::ConsistencyProof,
491 ) -> Stream<T, L2, B, O, R>
492 where
493 L: Location<'a>,
494 {
495 if L::consistency() == L2::consistency() {
496 Stream::new(
497 self.location.with_consistency_of(),
498 self.ir_node.replace(HydroNode::Placeholder),
499 )
500 } else {
501 Stream::new(
502 self.location.with_consistency_of(),
503 HydroNode::AssertIsConsistent {
504 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
505 trusted: false,
506 metadata: self
507 .location
508 .clone()
509 .with_consistency_of::<L2>()
510 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
511 },
512 )
513 }
514 }
515
516 pub(crate) fn assert_has_consistency_of_trusted<
517 L2: Location<'a, DropConsistency = L::DropConsistency>,
518 >(
519 self,
520 _proof: impl crate::properties::ConsistencyProof,
521 ) -> Stream<T, L2, B, O, R>
522 where
523 L: Location<'a>,
524 {
525 if L::consistency() == L2::consistency() {
526 Stream::new(
527 self.location.with_consistency_of(),
528 self.ir_node.replace(HydroNode::Placeholder),
529 )
530 } else {
531 Stream::new(
532 self.location.with_consistency_of(),
533 HydroNode::AssertIsConsistent {
534 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
535 trusted: true,
536 metadata: self
537 .location
538 .clone()
539 .with_consistency_of::<L2>()
540 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
541 },
542 )
543 }
544 }
545
546 pub(crate) fn collection_kind() -> CollectionKind {
547 CollectionKind::Stream {
548 bound: B::BOUND_KIND,
549 order: O::ORDERING_KIND,
550 retry: R::RETRIES_KIND,
551 element_type: quote_type::<T>().into(),
552 }
553 }
554
555 pub fn map<U, F, C, I, const WAS_MUT: bool>(
575 self,
576 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
577 ) -> Stream<U, L, B, O, R>
578 where
579 F: FnMut(T) -> U + 'a,
580 C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
581 I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
582 {
583 let f = crate::handoff_ref::with_ref_capture(|| {
584 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
585 proof.register_proof(&expr);
586 expr.into()
587 });
588 Stream::new(
589 self.location.clone(),
590 HydroNode::Map {
591 f,
592 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
593 metadata: self
594 .location
595 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
596 },
597 )
598 }
599
600 pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
625 self,
626 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
627 ) -> Stream<U, L, B, O, R>
628 where
629 I: IntoIterator<Item = U>,
630 F: FnMut(T) -> I + 'a,
631 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
632 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
633 {
634 let f = crate::handoff_ref::with_ref_capture(|| {
635 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
636 proof.register_proof(&expr);
637 expr.into()
638 });
639 Stream::new(
640 self.location.clone(),
641 HydroNode::FlatMap {
642 f,
643 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
644 metadata: self
645 .location
646 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
647 },
648 )
649 }
650
651 pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
678 self,
679 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
680 ) -> Stream<U, L, B, NoOrder, R>
681 where
682 I: IntoIterator<Item = U>,
683 F: FnMut(T) -> I + 'a,
684 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
685 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
686 {
687 let f = crate::handoff_ref::with_ref_capture(|| {
688 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
689 proof.register_proof(&expr);
690 expr.into()
691 });
692 Stream::new(
693 self.location.clone(),
694 HydroNode::FlatMap {
695 f,
696 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
697 metadata: self
698 .location
699 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
700 },
701 )
702 }
703
704 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
727 where
728 T: IntoIterator<Item = U>,
729 {
730 self.flat_map_ordered(q!(|d| d))
731 }
732
733 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
760 where
761 T: IntoIterator<Item = U>,
762 {
763 self.flat_map_unordered(q!(|d| d))
764 }
765
766 pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
770 self,
771 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
772 ) -> Stream<U, L, B, O, R>
773 where
774 S: futures::Stream<Item = U>,
775 F: FnMut(T) -> S + 'a,
776 C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
777 Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
778 {
779 let f = crate::handoff_ref::with_ref_capture(|| {
780 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
781 proof.register_proof(&expr);
782 expr.into()
783 });
784 Stream::new(
785 self.location.clone(),
786 HydroNode::FlatMapStreamBlocking {
787 f,
788 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
789 metadata: self
790 .location
791 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
792 },
793 )
794 }
795
796 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
800 where
801 T: futures::Stream<Item = U>,
802 {
803 self.flat_map_stream_blocking(q!(|d| d))
804 }
805
806 pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
831 self,
832 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
833 ) -> Self
834 where
835 F: FnMut(&T) -> bool + 'a,
836 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
837 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
838 {
839 let f = crate::handoff_ref::with_ref_capture(|| {
840 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
841 proof.register_proof(&expr);
842 expr.into()
843 });
844 Stream::new(
845 self.location.clone(),
846 HydroNode::Filter {
847 f,
848 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
849 metadata: self.location.new_node_metadata(Self::collection_kind()),
850 },
851 )
852 }
853
854 #[expect(
889 clippy::type_complexity,
890 reason = "return type mirrors the input stream type"
891 )]
892 pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
893 self,
894 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
895 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
896 where
897 F: FnMut(&T) -> bool + 'a,
898 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
899 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
900 {
901 let f = crate::handoff_ref::with_ref_capture(|| {
902 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
903 proof.register_proof(&expr);
904 expr.into()
905 });
906 let shared = SharedNode(Rc::new(RefCell::new(
907 self.ir_node.replace(HydroNode::Placeholder),
908 )));
909
910 let true_stream = Stream::new(
911 self.location.clone(),
912 HydroNode::Partition {
913 inner: SharedNode(shared.0.clone()),
914 f: f.clone(),
915 is_true: true,
916 metadata: self.location.new_node_metadata(Self::collection_kind()),
917 },
918 );
919
920 let false_stream = Stream::new(
921 self.location.clone(),
922 HydroNode::Partition {
923 inner: SharedNode(shared.0),
924 f,
925 is_true: false,
926 metadata: self.location.new_node_metadata(Self::collection_kind()),
927 },
928 );
929
930 (true_stream, false_stream)
931 }
932
933 pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
953 self,
954 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
955 ) -> Stream<U, L, B, O, R>
956 where
957 F: FnMut(T) -> Option<U> + 'a,
958 C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
959 Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
960 {
961 let f = crate::handoff_ref::with_ref_capture(|| {
962 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
963 proof.register_proof(&expr);
964 expr.into()
965 });
966 Stream::new(
967 self.location.clone(),
968 HydroNode::FilterMap {
969 f,
970 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
971 metadata: self
972 .location
973 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
974 },
975 )
976 }
977
978 pub fn cross_singleton<O2>(
1003 self,
1004 other: impl Into<Optional<O2, L, Bounded>>,
1005 ) -> Stream<(T, O2), L, B, O, R>
1006 where
1007 O2: Clone,
1008 {
1009 let other: Optional<O2, L, Bounded> = other.into();
1010 check_matching_location(&self.location, &other.location);
1011
1012 Stream::new(
1013 self.location.clone(),
1014 HydroNode::CrossSingleton {
1015 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1016 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1017 metadata: self
1018 .location
1019 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1020 },
1021 )
1022 }
1023
1024 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1056 self.cross_singleton(signal.filter(q!(|b| *b)))
1057 .map(q!(|(d, _)| d))
1058 }
1059
1060 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1095 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1096 self.filter_if(signal.is_some())
1097 }
1098
1099 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1134 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1135 self.filter_if(other.is_none())
1136 }
1137
1138 #[expect(
1163 clippy::type_complexity,
1164 reason = "MinRetries projection in return type"
1165 )]
1166 pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1167 self,
1168 other: Stream<T2, L, B2, O2, R2>,
1169 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1170 where
1171 T: Clone,
1172 T2: Clone,
1173 R: MinRetries<R2>,
1174 {
1175 self.map(q!(|v| ((), v)))
1176 .join(other.map(q!(|v| ((), v))))
1177 .map(q!(|((), (v1, v2))| (v1, v2)))
1178 }
1179
1180 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1199 where
1200 T: Eq + Hash,
1201 {
1202 Stream::new(
1203 self.location.clone(),
1204 HydroNode::Unique {
1205 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1206 metadata: self
1207 .location
1208 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1209 },
1210 )
1211 }
1212
1213 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1239 where
1240 T: Eq + Hash,
1241 B2: IsBounded,
1242 {
1243 check_matching_location(&self.location, &other.location);
1244
1245 Stream::new(
1246 self.location.clone(),
1247 HydroNode::Difference {
1248 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1249 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1250 metadata: self
1251 .location
1252 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1253 },
1254 )
1255 }
1256
1257 pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1278 self,
1279 f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1280 ) -> Self
1281 where
1282 F: FnMut(&T) + 'a,
1283 C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1284 Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1285 {
1286 let f = crate::handoff_ref::with_ref_capture(|| {
1287 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1288 proof.register_proof(&expr);
1289 expr.into()
1290 });
1291
1292 Stream::new(
1293 self.location.clone(),
1294 HydroNode::Inspect {
1295 f,
1296 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1297 metadata: self.location.new_node_metadata(Self::collection_kind()),
1298 },
1299 )
1300 }
1301
1302 pub fn for_each<F: FnMut(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1313 where
1314 O: IsOrdered,
1315 R: IsExactlyOnce,
1316 {
1317 let f = crate::handoff_ref::with_ref_capture(|| f.splice_fnmut1_ctx(&self.location).into());
1318 self.location
1319 .flow_state()
1320 .borrow_mut()
1321 .push_root(HydroRoot::ForEach {
1322 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1323 f,
1324 op_metadata: HydroIrOpMetadata::new(),
1325 });
1326 }
1327
1328 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1334 where
1335 O: IsOrdered,
1336 R: IsExactlyOnce,
1337 S: 'a + futures::Sink<T> + Unpin,
1338 {
1339 self.location
1340 .flow_state()
1341 .borrow_mut()
1342 .push_root(HydroRoot::DestSink {
1343 sink: sink.splice_typed_ctx(&self.location).into(),
1344 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1345 op_metadata: HydroIrOpMetadata::new(),
1346 });
1347 }
1348
1349 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1369 where
1370 O: IsOrdered,
1371 R: IsExactlyOnce,
1372 {
1373 Stream::new(
1374 self.location.clone(),
1375 HydroNode::Enumerate {
1376 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1377 metadata: self.location.new_node_metadata(Stream::<
1378 (usize, T),
1379 L,
1380 B,
1381 TotalOrder,
1382 ExactlyOnce,
1383 >::collection_kind()),
1384 },
1385 )
1386 }
1387
1388 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1412 self,
1413 init: impl IntoQuotedMut<'a, I, L>,
1414 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1415 ) -> Singleton<A, L, B2>
1416 where
1417 I: Fn() -> A + 'a,
1418 F: 'a + Fn(&mut A, T),
1419 C: ValidCommutativityFor<O>,
1420 Idemp: ValidIdempotenceFor<R>,
1421 B: ApplyMonotoneStream<M, B2>,
1422 {
1423 let init = init.splice_fn0_ctx(&self.location).into();
1424 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1425 proof.register_proof(&comb);
1426
1427 let nondet = nondet!();
1430 let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1431
1432 let core = HydroNode::Fold {
1433 init,
1434 acc: comb.into(),
1435 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1436 metadata: retried
1437 .location
1438 .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1439 };
1444
1445 Singleton::new(retried.location.clone(), core)
1446 .assert_has_consistency_of(manual_proof!())
1447 }
1448
1449 pub fn reduce<F, C, Idemp>(
1472 self,
1473 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1474 ) -> Optional<T, L, B>
1475 where
1476 F: Fn(&mut T, T) + 'a,
1477 C: ValidCommutativityFor<O>,
1478 Idemp: ValidIdempotenceFor<R>,
1479 {
1480 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1481 proof.register_proof(&f);
1482
1483 let nondet = nondet!();
1484 let ordered_etc: Stream<T, L::DropConsistency, B> =
1485 self.assume_retries(nondet).assume_ordering(nondet);
1486
1487 let core = HydroNode::Reduce {
1488 f: f.into(),
1489 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1490 metadata: ordered_etc
1491 .location
1492 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1493 };
1494
1495 Optional::new(ordered_etc.location.clone(), core)
1496 .assert_has_consistency_of(manual_proof!())
1497 }
1498
1499 pub fn max(self) -> Optional<T, L, B>
1519 where
1520 T: Ord,
1521 {
1522 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1523 .assume_ordering_trusted_bounded::<TotalOrder>(
1524 nondet!(),
1525 )
1526 .reduce(q!(|curr, new| {
1527 if new > *curr {
1528 *curr = new;
1529 }
1530 }))
1531 }
1532
1533 pub fn min(self) -> Optional<T, L, B>
1553 where
1554 T: Ord,
1555 {
1556 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1557 .assume_ordering_trusted_bounded::<TotalOrder>(
1558 nondet!(),
1559 )
1560 .reduce(q!(|curr, new| {
1561 if new < *curr {
1562 *curr = new;
1563 }
1564 }))
1565 }
1566
1567 pub fn first(self) -> Optional<T, L, B>
1590 where
1591 O: IsOrdered,
1592 {
1593 self.make_totally_ordered()
1594 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1595 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1596 .reduce(q!(|_, _| {}))
1597 }
1598
1599 pub fn last(self) -> Optional<T, L, B>
1622 where
1623 O: IsOrdered,
1624 {
1625 self.make_totally_ordered()
1626 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1627 .reduce(q!(|curr, new| *curr = new))
1628 }
1629
1630 pub fn limit(
1653 self,
1654 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1655 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1656 where
1657 O: IsOrdered,
1658 R: IsExactlyOnce,
1659 {
1660 self.generator(
1661 q!(|| 0usize),
1662 q!(move |count, item| {
1663 if *count == n {
1664 Generate::Break
1665 } else {
1666 *count += 1;
1667 if *count == n {
1668 Generate::Return(item)
1669 } else {
1670 Generate::Yield(item)
1671 }
1672 }
1673 }),
1674 )
1675 }
1676
1677 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1703 where
1704 O: IsOrdered,
1705 R: IsExactlyOnce,
1706 {
1707 self.make_totally_ordered().make_exactly_once().fold(
1708 q!(|| vec![]),
1709 q!(|acc, v| {
1710 acc.push(v);
1711 }),
1712 )
1713 }
1714
1715 pub fn scan<A, U, I, F>(
1776 self,
1777 init: impl IntoQuotedMut<'a, I, L>,
1778 f: impl IntoQuotedMut<'a, F, L>,
1779 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1780 where
1781 O: IsOrdered,
1782 R: IsExactlyOnce,
1783 I: Fn() -> A + 'a,
1784 F: Fn(&mut A, T) -> Option<U> + 'a,
1785 {
1786 let init = init.splice_fn0_ctx(&self.location).into();
1787 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1788
1789 Stream::new(
1790 self.location.clone(),
1791 HydroNode::Scan {
1792 init,
1793 acc: f,
1794 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1795 metadata: self.location.new_node_metadata(
1796 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1797 ),
1798 },
1799 )
1800 }
1801
1802 pub fn scan_async_blocking<A, U, I, F, Fut>(
1836 self,
1837 init: impl IntoQuotedMut<'a, I, L>,
1838 f: impl IntoQuotedMut<'a, F, L>,
1839 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1840 where
1841 O: IsOrdered,
1842 R: IsExactlyOnce,
1843 I: Fn() -> A + 'a,
1844 F: Fn(&mut A, T) -> Fut + 'a,
1845 Fut: Future<Output = Option<U>> + 'a,
1846 {
1847 let init = init.splice_fn0_ctx(&self.location).into();
1848 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1849
1850 Stream::new(
1851 self.location.clone(),
1852 HydroNode::ScanAsyncBlocking {
1853 init,
1854 acc: f,
1855 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1856 metadata: self.location.new_node_metadata(
1857 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1858 ),
1859 },
1860 )
1861 }
1862
1863 pub fn generator<A, U, I, F>(
1903 self,
1904 init: impl IntoQuotedMut<'a, I, L> + Copy,
1905 f: impl IntoQuotedMut<'a, F, L> + Copy,
1906 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1907 where
1908 O: IsOrdered,
1909 R: IsExactlyOnce,
1910 I: Fn() -> A + 'a,
1911 F: Fn(&mut A, T) -> Generate<U> + 'a,
1912 {
1913 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1914 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1915
1916 let this = self.make_totally_ordered().make_exactly_once();
1917
1918 let scan_init = q!(|| None)
1923 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1924 .into();
1925 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1926 if state.is_none() {
1927 *state = Some(Some(init()));
1928 }
1929 match state {
1930 Some(Some(state_value)) => match f(state_value, v) {
1931 Generate::Yield(out) => Some(Some(out)),
1932 Generate::Return(out) => {
1933 *state = Some(None);
1934 Some(Some(out))
1935 }
1936 Generate::Break => None,
1940 Generate::Continue => Some(None),
1941 },
1942 _ => None,
1944 }
1945 })
1946 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1947 .into();
1948
1949 let scan_node = HydroNode::Scan {
1950 init: scan_init,
1951 acc: scan_f,
1952 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1953 metadata: this.location.new_node_metadata(Stream::<
1954 Option<U>,
1955 L,
1956 B,
1957 TotalOrder,
1958 ExactlyOnce,
1959 >::collection_kind()),
1960 };
1961
1962 let flatten_f = q!(|d| d)
1963 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1964 .into();
1965 let flatten_node = HydroNode::FlatMap {
1966 f: flatten_f,
1967 input: Box::new(scan_node),
1968 metadata: this
1969 .location
1970 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1971 };
1972
1973 Stream::new(this.location.clone(), flatten_node)
1974 }
1975
1976 pub fn sample_every(
1985 self,
1986 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1987 nondet: NonDet,
1988 ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1989 where
1990 L: TopLevel<'a>,
1991 {
1992 let samples = self.location.source_interval(interval);
1993
1994 let tick = self.location.tick();
1995 self.batch(&tick, nondet)
1996 .filter_if(samples.batch(&tick, nondet).first().is_some())
1997 .all_ticks()
1998 .weaken_retries()
1999 }
2000
2001 pub fn timeout(
2011 self,
2012 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2013 nondet: NonDet,
2014 ) -> Optional<(), L::DropConsistency, Unbounded>
2015 where
2016 L: TopLevel<'a>,
2017 {
2018 let tick = self.location.tick();
2019
2020 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2021 q!(|| None),
2022 q!(
2023 |latest, _| {
2024 *latest = Some(Instant::now());
2025 },
2026 commutative = manual_proof!()
2027 ),
2028 );
2029
2030 latest_received
2031 .snapshot(&tick, nondet)
2032 .filter_map(q!(move |latest_received| {
2033 if let Some(latest_received) = latest_received {
2034 if Instant::now().duration_since(latest_received) > duration {
2035 Some(())
2036 } else {
2037 None
2038 }
2039 } else {
2040 Some(())
2041 }
2042 }))
2043 .latest()
2044 }
2045
2046 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2052 let id = self.location.flow_state().borrow_mut().next_clock_id();
2053 let out_location = Atomic {
2054 tick: Tick {
2055 id,
2056 l: self.location.clone(),
2057 },
2058 };
2059 Stream::new(
2060 out_location.clone(),
2061 HydroNode::BeginAtomic {
2062 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2063 metadata: out_location
2064 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2065 },
2066 )
2067 }
2068
2069 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2077 self,
2078 tick: &Tick<L2>,
2079 _nondet: NonDet,
2080 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2081 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2082 Stream::new(
2083 tick.drop_consistency(),
2084 HydroNode::Batch {
2085 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2086 metadata: tick
2087 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2088 },
2089 )
2090 }
2091
2092 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2095 {
2096 let mut node = self.ir_node.borrow_mut();
2097 let metadata = node.metadata_mut();
2098 metadata.tag = Some(name.to_owned());
2099 }
2100 self
2101 }
2102
2103 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2107 where
2108 B: IsBounded,
2109 {
2110 Optional::new(
2111 self.location.clone(),
2112 HydroNode::Cast {
2113 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2114 metadata: self
2115 .location
2116 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2117 },
2118 )
2119 }
2120
2121 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2122 if O::ORDERING_KIND == O2::ORDERING_KIND {
2123 Stream::new(
2124 self.location.clone(),
2125 self.ir_node.replace(HydroNode::Placeholder),
2126 )
2127 } else {
2128 panic!(
2129 "Runtime ordering {:?} did not match requested cast {:?}.",
2130 O::ORDERING_KIND,
2131 O2::ORDERING_KIND
2132 )
2133 }
2134 }
2135
2136 pub fn assume_ordering<O2: Ordering>(
2145 self,
2146 _nondet: NonDet,
2147 ) -> Stream<T, L::DropConsistency, B, O2, R> {
2148 if O::ORDERING_KIND == O2::ORDERING_KIND {
2149 self.use_ordering_type().weaken_consistency()
2150 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2151 let target_location = self.location().drop_consistency();
2153 Stream::new(
2154 target_location.clone(),
2155 HydroNode::Cast {
2156 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2157 metadata: target_location
2158 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2159 },
2160 )
2161 } else {
2162 let target_location = self.location().drop_consistency();
2163 Stream::new(
2164 target_location.clone(),
2165 HydroNode::ObserveNonDet {
2166 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2167 trusted: false,
2168 metadata: target_location
2169 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2170 },
2171 )
2172 }
2173 }
2174
2175 fn assume_ordering_trusted_bounded<O2: Ordering>(
2178 self,
2179 nondet: NonDet,
2180 ) -> Stream<T, L, B, O2, R> {
2181 if B::BOUNDED {
2182 self.assume_ordering_trusted(nondet)
2183 } else {
2184 let self_location = self.location.clone();
2185 let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2186 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2187 }
2188 }
2189
2190 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2193 self,
2194 _nondet: NonDet,
2195 ) -> Stream<T, L, B, O2, R> {
2196 if O::ORDERING_KIND == O2::ORDERING_KIND {
2197 self.use_ordering_type()
2198 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2199 Stream::new(
2201 self.location.clone(),
2202 HydroNode::Cast {
2203 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2204 metadata: self
2205 .location
2206 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2207 },
2208 )
2209 } else {
2210 Stream::new(
2211 self.location.clone(),
2212 HydroNode::ObserveNonDet {
2213 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2214 trusted: true,
2215 metadata: self
2216 .location
2217 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2218 },
2219 )
2220 }
2221 }
2222
2223 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2224 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2227 self.weaken_ordering::<NoOrder>()
2228 }
2229
2230 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2233 let nondet = nondet!();
2234 self.assume_ordering_trusted::<O2>(nondet)
2235 }
2236
2237 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2240 where
2241 O: IsOrdered,
2242 {
2243 self.assume_ordering_trusted(nondet!())
2244 }
2245
2246 pub fn assume_retries<R2: Retries>(
2255 self,
2256 _nondet: NonDet,
2257 ) -> Stream<T, L::DropConsistency, B, O, R2> {
2258 if R::RETRIES_KIND == R2::RETRIES_KIND {
2259 Stream::new(
2260 self.location.drop_consistency(),
2261 self.ir_node.replace(HydroNode::Placeholder),
2262 )
2263 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2264 let target_location = self.location.drop_consistency();
2266 Stream::new(
2267 target_location.clone(),
2268 HydroNode::Cast {
2269 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2270 metadata: target_location
2271 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2272 },
2273 )
2274 } else {
2275 let target_location = self.location.drop_consistency();
2276 Stream::new(
2277 target_location.clone(),
2278 HydroNode::ObserveNonDet {
2279 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2280 trusted: false,
2281 metadata: target_location
2282 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2283 },
2284 )
2285 }
2286 }
2287
2288 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2291 if R::RETRIES_KIND == R2::RETRIES_KIND {
2292 Stream::new(
2293 self.location.clone(),
2294 self.ir_node.replace(HydroNode::Placeholder),
2295 )
2296 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2297 Stream::new(
2299 self.location.clone(),
2300 HydroNode::Cast {
2301 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2302 metadata: self
2303 .location
2304 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2305 },
2306 )
2307 } else {
2308 Stream::new(
2309 self.location.clone(),
2310 HydroNode::ObserveNonDet {
2311 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2312 trusted: true,
2313 metadata: self
2314 .location
2315 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2316 },
2317 )
2318 }
2319 }
2320
2321 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2322 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2325 self.weaken_retries::<AtLeastOnce>()
2326 }
2327
2328 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2331 let nondet = nondet!();
2332 self.assume_retries_trusted::<R2>(nondet)
2333 }
2334
2335 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2338 where
2339 R: IsExactlyOnce,
2340 {
2341 self.assume_retries_trusted(nondet!())
2342 }
2343
2344 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2347 where
2348 B: IsBounded,
2349 {
2350 self.weaken_boundedness()
2351 }
2352
2353 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2356 if B::BOUNDED == B2::BOUNDED {
2357 Stream::new(
2358 self.location.clone(),
2359 self.ir_node.replace(HydroNode::Placeholder),
2360 )
2361 } else {
2362 Stream::new(
2364 self.location.clone(),
2365 HydroNode::Cast {
2366 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2367 metadata: self
2368 .location
2369 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2370 },
2371 )
2372 }
2373 }
2374}
2375
2376impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2377where
2378 L: Location<'a>,
2379{
2380 pub fn cloned(self) -> Stream<T, L, B, O, R>
2398 where
2399 T: Clone,
2400 {
2401 self.map(q!(|d| d.clone()))
2402 }
2403}
2404
2405impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2406where
2407 L: Location<'a>,
2408{
2409 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2428 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2429 ))
2431 .fold(
2432 q!(|| 0usize),
2433 q!(
2434 |count, _| *count += 1,
2435 monotone = manual_proof!()
2436 ),
2437 )
2438 }
2439}
2440
2441impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2442 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2466 self,
2467 other: Stream<T, L, Unbounded, O2, R2>,
2468 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2469 where
2470 R: MinRetries<R2>,
2471 {
2472 Stream::new(
2473 self.location.clone(),
2474 HydroNode::Chain {
2475 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2476 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2477 metadata: self.location.new_node_metadata(Stream::<
2478 T,
2479 L,
2480 Unbounded,
2481 NoOrder,
2482 <R as MinRetries<R2>>::Min,
2483 >::collection_kind()),
2484 },
2485 )
2486 }
2487
2488 #[deprecated(note = "use `merge_unordered` instead")]
2490 pub fn interleave<O2: Ordering, R2: Retries>(
2491 self,
2492 other: Stream<T, L, Unbounded, O2, R2>,
2493 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2494 where
2495 R: MinRetries<R2>,
2496 {
2497 self.merge_unordered(other)
2498 }
2499}
2500
2501impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2502 pub fn merge_ordered<R2: Retries>(
2530 self,
2531 other: Stream<T, L, B, TotalOrder, R2>,
2532 _nondet: NonDet,
2533 ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2534 where
2535 R: MinRetries<R2>,
2536 {
2537 let target_location = self.location().drop_consistency();
2538 Stream::new(
2539 target_location.clone(),
2540 HydroNode::MergeOrdered {
2541 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2542 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2543 metadata: target_location.new_node_metadata(Stream::<
2544 T,
2545 L::DropConsistency,
2546 B,
2547 TotalOrder,
2548 <R as MinRetries<R2>>::Min,
2549 >::collection_kind()),
2550 },
2551 )
2552 }
2553}
2554
2555impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2556where
2557 L: Location<'a>,
2558{
2559 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2585 where
2586 B: IsBounded,
2587 T: Ord,
2588 {
2589 let this = self.make_bounded();
2590 Stream::new(
2591 this.location.clone(),
2592 HydroNode::Sort {
2593 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2594 metadata: this
2595 .location
2596 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2597 },
2598 )
2599 }
2600
2601 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2629 self,
2630 other: Stream<T, L, B2, O2, R2>,
2631 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2632 where
2633 B: IsBounded,
2634 O: MinOrder<O2>,
2635 R: MinRetries<R2>,
2636 {
2637 check_matching_location(&self.location, &other.location);
2638
2639 Stream::new(
2640 self.location.clone(),
2641 HydroNode::Chain {
2642 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2643 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2644 metadata: self.location.new_node_metadata(Stream::<
2645 T,
2646 L,
2647 B2,
2648 <O as MinOrder<O2>>::Min,
2649 <R as MinRetries<R2>>::Min,
2650 >::collection_kind()),
2651 },
2652 )
2653 }
2654
2655 #[expect(
2659 clippy::type_complexity,
2660 reason = "MinRetries projection in return type"
2661 )]
2662 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2663 self,
2664 other: Stream<T2, L, Bounded, O2, R2>,
2665 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2666 where
2667 B: IsBounded,
2668 T: Clone,
2669 T2: Clone,
2670 R: MinRetries<R2>,
2671 {
2672 let this = self.make_bounded();
2673 check_matching_location(&this.location, &other.location);
2674
2675 Stream::new(
2676 this.location.clone(),
2677 HydroNode::CrossProduct {
2678 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2679 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2680 metadata: this.location.new_node_metadata(Stream::<
2681 (T, T2),
2682 L,
2683 Bounded,
2684 <O2 as MinOrder<O>>::Min,
2685 <R as MinRetries<R2>>::Min,
2686 >::collection_kind()),
2687 },
2688 )
2689 }
2690
2691 pub fn repeat_with_keys<K, V2>(
2729 self,
2730 keys: KeyedSingleton<K, V2, L, Bounded>,
2731 ) -> KeyedStream<K, T, L, Bounded, O, R>
2732 where
2733 B: IsBounded,
2734 K: Clone,
2735 T: Clone,
2736 {
2737 keys.keys()
2738 .assume_ordering_trusted::<TotalOrder>(
2739 nondet!(),
2740 )
2741 .cross_product_nested_loop(self.make_bounded())
2742 .into_keyed()
2743 }
2744
2745 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2782 where
2783 T: Future,
2784 {
2785 Stream::new(
2786 self.location.clone(),
2787 HydroNode::ResolveFuturesBlocking {
2788 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2789 metadata: self
2790 .location
2791 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2792 },
2793 )
2794 }
2795
2796 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2816 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2817 where
2818 B: IsBounded,
2819 {
2820 self.make_bounded()
2821 .assume_ordering_trusted::<TotalOrder>(
2822 nondet!(),
2823 )
2824 .first()
2825 .is_none()
2826 }
2827}
2828
2829impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2830where
2831 L: Location<'a>,
2832{
2833 #[expect(clippy::type_complexity, reason = "ordering / retries propagation")]
2834 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2859 self,
2860 n: Stream<(K, V2), L, B2, O2, R2>,
2861 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2862 where
2863 K: Eq + Hash + Clone,
2864 R: MinRetries<R2>,
2865 V1: Clone,
2866 V2: Clone,
2867 {
2868 check_matching_location(&self.location, &n.location);
2869
2870 let ir_node = if B2::BOUNDED {
2871 HydroNode::JoinHalf {
2872 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2873 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2874 metadata: self.location.new_node_metadata(Stream::<
2875 (K, (V1, V2)),
2876 L,
2877 B,
2878 B2::PreserveOrderIfBounded<O>,
2879 <R as MinRetries<R2>>::Min,
2880 >::collection_kind()),
2881 }
2882 } else {
2883 HydroNode::Join {
2884 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2885 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2886 metadata: self.location.new_node_metadata(Stream::<
2887 (K, (V1, V2)),
2888 L,
2889 B,
2890 B2::PreserveOrderIfBounded<O>,
2891 <R as MinRetries<R2>>::Min,
2892 >::collection_kind()),
2893 }
2894 };
2895
2896 Stream::new(self.location.clone(), ir_node)
2897 }
2898
2899 pub fn anti_join<O2: Ordering, R2: Retries>(
2925 self,
2926 n: Stream<K, L, Bounded, O2, R2>,
2927 ) -> Stream<(K, V1), L, B, O, R>
2928 where
2929 K: Eq + Hash,
2930 {
2931 check_matching_location(&self.location, &n.location);
2932
2933 Stream::new(
2934 self.location.clone(),
2935 HydroNode::AntiJoin {
2936 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2937 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2938 metadata: self
2939 .location
2940 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2941 },
2942 )
2943 }
2944}
2945
2946impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2947 Stream<(K, V), L, B, O, R>
2948{
2949 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2976 KeyedStream::new(
2977 self.location.clone(),
2978 HydroNode::Cast {
2979 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2980 metadata: self
2981 .location
2982 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2983 },
2984 )
2985 }
2986}
2987
2988impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2989where
2990 K: Eq + Hash,
2991 L: Location<'a>,
2992{
2993 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
3012 self.into_keyed()
3013 .fold(
3014 q!(|| ()),
3015 q!(
3016 |_, _| {},
3017 commutative = manual_proof!(),
3018 idempotent = manual_proof!()
3019 ),
3020 )
3021 .keys()
3022 }
3023}
3024
3025impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3026where
3027 L: Location<'a>,
3028{
3029 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3036 self,
3037 tick: &Tick<L2>,
3038 _nondet: NonDet,
3039 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3040 Stream::new(
3041 tick.drop_consistency(),
3042 HydroNode::Batch {
3043 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3044 metadata: tick
3045 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3046 },
3047 )
3048 }
3049
3050 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3053 Stream::new(
3054 self.location.tick.l.clone(),
3055 HydroNode::EndAtomic {
3056 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3057 metadata: self
3058 .location
3059 .tick
3060 .l
3061 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3062 },
3063 )
3064 }
3065}
3066
3067impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3068where
3069 L: TopLevel<'a>,
3070 F: Future<Output = T>,
3071{
3072 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3103 Stream::new(
3104 self.location.clone(),
3105 HydroNode::ResolveFutures {
3106 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3107 metadata: self
3108 .location
3109 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3110 },
3111 )
3112 }
3113
3114 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3145 Stream::new(
3146 self.location.clone(),
3147 HydroNode::ResolveFuturesOrdered {
3148 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3149 metadata: self
3150 .location
3151 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3152 },
3153 )
3154 }
3155}
3156
3157impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3158where
3159 L: Location<'a>,
3160{
3161 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3164 Stream::new(
3165 self.location.outer().clone(),
3166 HydroNode::YieldConcat {
3167 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3168 metadata: self
3169 .location
3170 .outer()
3171 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3172 },
3173 )
3174 }
3175
3176 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3183 let out_location = Atomic {
3184 tick: self.location.clone(),
3185 };
3186
3187 Stream::new(
3188 out_location.clone(),
3189 HydroNode::YieldConcat {
3190 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3191 metadata: out_location
3192 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3193 },
3194 )
3195 }
3196
3197 pub fn across_ticks<Out: BatchAtomic<'a>>(
3233 self,
3234 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3235 ) -> Out::Batched {
3236 thunk(self.all_ticks_atomic()).batched_atomic()
3237 }
3238
3239 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3278 Stream::new(
3279 self.location.clone(),
3280 HydroNode::DeferTick {
3281 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3282 metadata: self
3283 .location
3284 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3285 },
3286 )
3287 }
3288}
3289
3290#[cfg(test)]
3291mod tests {
3292 #[cfg(feature = "deploy")]
3293 use futures::{SinkExt, StreamExt};
3294 #[cfg(feature = "deploy")]
3295 use hydro_deploy::Deployment;
3296 #[cfg(feature = "deploy")]
3297 use serde::{Deserialize, Serialize};
3298 #[cfg(any(feature = "deploy", feature = "sim"))]
3299 use stageleft::q;
3300
3301 #[cfg(any(feature = "deploy", feature = "sim"))]
3302 use crate::compile::builder::FlowBuilder;
3303 #[cfg(feature = "deploy")]
3304 use crate::live_collections::sliced::sliced;
3305 #[cfg(feature = "deploy")]
3306 use crate::live_collections::stream::ExactlyOnce;
3307 #[cfg(feature = "sim")]
3308 use crate::live_collections::stream::NoOrder;
3309 #[cfg(any(feature = "deploy", feature = "sim"))]
3310 use crate::live_collections::stream::TotalOrder;
3311 #[cfg(any(feature = "deploy", feature = "sim"))]
3312 use crate::location::Location;
3313 #[cfg(feature = "sim")]
3314 use crate::networking::TCP;
3315 #[cfg(any(feature = "deploy", feature = "sim"))]
3316 use crate::nondet::nondet;
3317
3318 mod backtrace_chained_ops;
3319
3320 #[cfg(feature = "deploy")]
3321 struct P1 {}
3322 #[cfg(feature = "deploy")]
3323 struct P2 {}
3324
3325 #[cfg(feature = "deploy")]
3326 #[derive(Serialize, Deserialize, Debug)]
3327 struct SendOverNetwork {
3328 n: u32,
3329 }
3330
3331 #[cfg(feature = "deploy")]
3332 #[tokio::test]
3333 async fn first_ten_distributed() {
3334 use crate::networking::TCP;
3335
3336 let mut deployment = Deployment::new();
3337
3338 let mut flow = FlowBuilder::new();
3339 let first_node = flow.process::<P1>();
3340 let second_node = flow.process::<P2>();
3341 let external = flow.external::<P2>();
3342
3343 let numbers = first_node.source_iter(q!(0..10));
3344 let out_port = numbers
3345 .map(q!(|n| SendOverNetwork { n }))
3346 .send(&second_node, TCP.fail_stop().bincode())
3347 .send_bincode_external(&external);
3348
3349 let nodes = flow
3350 .with_process(&first_node, deployment.Localhost())
3351 .with_process(&second_node, deployment.Localhost())
3352 .with_external(&external, deployment.Localhost())
3353 .deploy(&mut deployment);
3354
3355 deployment.deploy().await.unwrap();
3356
3357 let mut external_out = nodes.connect(out_port).await;
3358
3359 deployment.start().await.unwrap();
3360
3361 for i in 0..10 {
3362 assert_eq!(external_out.next().await.unwrap().n, i);
3363 }
3364 }
3365
3366 #[cfg(feature = "deploy")]
3367 #[tokio::test]
3368 async fn first_cardinality() {
3369 let mut deployment = Deployment::new();
3370
3371 let mut flow = FlowBuilder::new();
3372 let node = flow.process::<()>();
3373 let external = flow.external::<()>();
3374
3375 let node_tick = node.tick();
3376 let count = node_tick
3377 .singleton(q!([1, 2, 3]))
3378 .into_stream()
3379 .flatten_ordered()
3380 .first()
3381 .into_stream()
3382 .count()
3383 .all_ticks()
3384 .send_bincode_external(&external);
3385
3386 let nodes = flow
3387 .with_process(&node, deployment.Localhost())
3388 .with_external(&external, deployment.Localhost())
3389 .deploy(&mut deployment);
3390
3391 deployment.deploy().await.unwrap();
3392
3393 let mut external_out = nodes.connect(count).await;
3394
3395 deployment.start().await.unwrap();
3396
3397 assert_eq!(external_out.next().await.unwrap(), 1);
3398 }
3399
3400 #[cfg(feature = "deploy")]
3401 #[tokio::test]
3402 async fn unbounded_reduce_remembers_state() {
3403 let mut deployment = Deployment::new();
3404
3405 let mut flow = FlowBuilder::new();
3406 let node = flow.process::<()>();
3407 let external = flow.external::<()>();
3408
3409 let (input_port, input) = node.source_external_bincode(&external);
3410 let out = input
3411 .reduce(q!(|acc, v| *acc += v))
3412 .sample_eager(nondet!())
3413 .send_bincode_external(&external);
3414
3415 let nodes = flow
3416 .with_process(&node, deployment.Localhost())
3417 .with_external(&external, deployment.Localhost())
3418 .deploy(&mut deployment);
3419
3420 deployment.deploy().await.unwrap();
3421
3422 let mut external_in = nodes.connect(input_port).await;
3423 let mut external_out = nodes.connect(out).await;
3424
3425 deployment.start().await.unwrap();
3426
3427 external_in.send(1).await.unwrap();
3428 assert_eq!(external_out.next().await.unwrap(), 1);
3429
3430 external_in.send(2).await.unwrap();
3431 assert_eq!(external_out.next().await.unwrap(), 3);
3432 }
3433
3434 #[cfg(feature = "deploy")]
3435 #[tokio::test]
3436 async fn top_level_bounded_cross_singleton() {
3437 let mut deployment = Deployment::new();
3438
3439 let mut flow = FlowBuilder::new();
3440 let node = flow.process::<()>();
3441 let external = flow.external::<()>();
3442
3443 let (input_port, input) =
3444 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3445
3446 let out = input
3447 .cross_singleton(
3448 node.source_iter(q!(vec![1, 2, 3]))
3449 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3450 )
3451 .send_bincode_external(&external);
3452
3453 let nodes = flow
3454 .with_process(&node, deployment.Localhost())
3455 .with_external(&external, deployment.Localhost())
3456 .deploy(&mut deployment);
3457
3458 deployment.deploy().await.unwrap();
3459
3460 let mut external_in = nodes.connect(input_port).await;
3461 let mut external_out = nodes.connect(out).await;
3462
3463 deployment.start().await.unwrap();
3464
3465 external_in.send(1).await.unwrap();
3466 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3467
3468 external_in.send(2).await.unwrap();
3469 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3470 }
3471
3472 #[cfg(feature = "deploy")]
3473 #[tokio::test]
3474 async fn top_level_bounded_reduce_cardinality() {
3475 let mut deployment = Deployment::new();
3476
3477 let mut flow = FlowBuilder::new();
3478 let node = flow.process::<()>();
3479 let external = flow.external::<()>();
3480
3481 let (input_port, input) =
3482 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3483
3484 let out = sliced! {
3485 let input = use(input, nondet!());
3486 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!());
3487 input.cross_singleton(v.into_stream().count())
3488 }
3489 .send_bincode_external(&external);
3490
3491 let nodes = flow
3492 .with_process(&node, deployment.Localhost())
3493 .with_external(&external, deployment.Localhost())
3494 .deploy(&mut deployment);
3495
3496 deployment.deploy().await.unwrap();
3497
3498 let mut external_in = nodes.connect(input_port).await;
3499 let mut external_out = nodes.connect(out).await;
3500
3501 deployment.start().await.unwrap();
3502
3503 external_in.send(1).await.unwrap();
3504 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3505
3506 external_in.send(2).await.unwrap();
3507 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3508 }
3509
3510 #[cfg(feature = "deploy")]
3511 #[tokio::test]
3512 async fn top_level_bounded_into_singleton_cardinality() {
3513 let mut deployment = Deployment::new();
3514
3515 let mut flow = FlowBuilder::new();
3516 let node = flow.process::<()>();
3517 let external = flow.external::<()>();
3518
3519 let (input_port, input) =
3520 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3521
3522 let out = sliced! {
3523 let input = use(input, nondet!());
3524 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!());
3525 input.cross_singleton(v.into_stream().count())
3526 }
3527 .send_bincode_external(&external);
3528
3529 let nodes = flow
3530 .with_process(&node, deployment.Localhost())
3531 .with_external(&external, deployment.Localhost())
3532 .deploy(&mut deployment);
3533
3534 deployment.deploy().await.unwrap();
3535
3536 let mut external_in = nodes.connect(input_port).await;
3537 let mut external_out = nodes.connect(out).await;
3538
3539 deployment.start().await.unwrap();
3540
3541 external_in.send(1).await.unwrap();
3542 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3543
3544 external_in.send(2).await.unwrap();
3545 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3546 }
3547
3548 #[cfg(feature = "deploy")]
3549 #[tokio::test]
3550 async fn atomic_fold_replays_each_tick() {
3551 let mut deployment = Deployment::new();
3552
3553 let mut flow = FlowBuilder::new();
3554 let node = flow.process::<()>();
3555 let external = flow.external::<()>();
3556
3557 let (input_port, input) =
3558 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3559 let tick = node.tick();
3560
3561 let out = input
3562 .batch(&tick, nondet!())
3563 .cross_singleton(
3564 node.source_iter(q!(vec![1, 2, 3]))
3565 .atomic()
3566 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3567 .snapshot_atomic(&tick, nondet!()),
3568 )
3569 .all_ticks()
3570 .send_bincode_external(&external);
3571
3572 let nodes = flow
3573 .with_process(&node, deployment.Localhost())
3574 .with_external(&external, deployment.Localhost())
3575 .deploy(&mut deployment);
3576
3577 deployment.deploy().await.unwrap();
3578
3579 let mut external_in = nodes.connect(input_port).await;
3580 let mut external_out = nodes.connect(out).await;
3581
3582 deployment.start().await.unwrap();
3583
3584 external_in.send(1).await.unwrap();
3585 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3586
3587 external_in.send(2).await.unwrap();
3588 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3589 }
3590
3591 #[cfg(feature = "deploy")]
3592 #[tokio::test]
3593 async fn unbounded_scan_remembers_state() {
3594 let mut deployment = Deployment::new();
3595
3596 let mut flow = FlowBuilder::new();
3597 let node = flow.process::<()>();
3598 let external = flow.external::<()>();
3599
3600 let (input_port, input) = node.source_external_bincode(&external);
3601 let out = input
3602 .scan(
3603 q!(|| 0),
3604 q!(|acc, v| {
3605 *acc += v;
3606 Some(*acc)
3607 }),
3608 )
3609 .send_bincode_external(&external);
3610
3611 let nodes = flow
3612 .with_process(&node, deployment.Localhost())
3613 .with_external(&external, deployment.Localhost())
3614 .deploy(&mut deployment);
3615
3616 deployment.deploy().await.unwrap();
3617
3618 let mut external_in = nodes.connect(input_port).await;
3619 let mut external_out = nodes.connect(out).await;
3620
3621 deployment.start().await.unwrap();
3622
3623 external_in.send(1).await.unwrap();
3624 assert_eq!(external_out.next().await.unwrap(), 1);
3625
3626 external_in.send(2).await.unwrap();
3627 assert_eq!(external_out.next().await.unwrap(), 3);
3628 }
3629
3630 #[cfg(feature = "deploy")]
3631 #[tokio::test]
3632 async fn unbounded_enumerate_remembers_state() {
3633 let mut deployment = Deployment::new();
3634
3635 let mut flow = FlowBuilder::new();
3636 let node = flow.process::<()>();
3637 let external = flow.external::<()>();
3638
3639 let (input_port, input) = node.source_external_bincode(&external);
3640 let out = input.enumerate().send_bincode_external(&external);
3641
3642 let nodes = flow
3643 .with_process(&node, deployment.Localhost())
3644 .with_external(&external, deployment.Localhost())
3645 .deploy(&mut deployment);
3646
3647 deployment.deploy().await.unwrap();
3648
3649 let mut external_in = nodes.connect(input_port).await;
3650 let mut external_out = nodes.connect(out).await;
3651
3652 deployment.start().await.unwrap();
3653
3654 external_in.send(1).await.unwrap();
3655 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3656
3657 external_in.send(2).await.unwrap();
3658 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3659 }
3660
3661 #[cfg(feature = "deploy")]
3662 #[tokio::test]
3663 async fn unbounded_unique_remembers_state() {
3664 let mut deployment = Deployment::new();
3665
3666 let mut flow = FlowBuilder::new();
3667 let node = flow.process::<()>();
3668 let external = flow.external::<()>();
3669
3670 let (input_port, input) =
3671 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3672 let out = input.unique().send_bincode_external(&external);
3673
3674 let nodes = flow
3675 .with_process(&node, deployment.Localhost())
3676 .with_external(&external, deployment.Localhost())
3677 .deploy(&mut deployment);
3678
3679 deployment.deploy().await.unwrap();
3680
3681 let mut external_in = nodes.connect(input_port).await;
3682 let mut external_out = nodes.connect(out).await;
3683
3684 deployment.start().await.unwrap();
3685
3686 external_in.send(1).await.unwrap();
3687 assert_eq!(external_out.next().await.unwrap(), 1);
3688
3689 external_in.send(2).await.unwrap();
3690 assert_eq!(external_out.next().await.unwrap(), 2);
3691
3692 external_in.send(1).await.unwrap();
3693 external_in.send(3).await.unwrap();
3694 assert_eq!(external_out.next().await.unwrap(), 3);
3695 }
3696
3697 #[cfg(feature = "sim")]
3698 #[test]
3699 #[should_panic]
3700 fn sim_batch_nondet_size() {
3701 let mut flow = FlowBuilder::new();
3702 let node = flow.process::<()>();
3703
3704 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3705
3706 let tick = node.tick();
3707 let out_recv = input
3708 .batch(&tick, nondet!())
3709 .count()
3710 .all_ticks()
3711 .sim_output();
3712
3713 flow.sim().exhaustive(async || {
3714 in_send.send(());
3715 in_send.send(());
3716 in_send.send(());
3717
3718 assert_eq!(out_recv.next().await.unwrap(), 3); });
3720 }
3721
3722 #[cfg(feature = "sim")]
3723 #[test]
3724 fn sim_batch_preserves_order() {
3725 let mut flow = FlowBuilder::new();
3726 let node = flow.process::<()>();
3727
3728 let (in_send, input) = node.sim_input();
3729
3730 let tick = node.tick();
3731 let out_recv = input
3732 .batch(&tick, nondet!())
3733 .all_ticks()
3734 .sim_output();
3735
3736 flow.sim().exhaustive(async || {
3737 in_send.send(1);
3738 in_send.send(2);
3739 in_send.send(3);
3740
3741 out_recv.assert_yields_only([1, 2, 3]).await;
3742 });
3743 }
3744
3745 #[cfg(feature = "sim")]
3746 #[test]
3747 #[should_panic]
3748 fn sim_batch_unordered_shuffles() {
3749 let mut flow = FlowBuilder::new();
3750 let node = flow.process::<()>();
3751
3752 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3753
3754 let tick = node.tick();
3755 let batch = input.batch(&tick, nondet!());
3756 let out_recv = batch
3757 .clone()
3758 .min()
3759 .zip(batch.max())
3760 .all_ticks()
3761 .sim_output();
3762
3763 flow.sim().exhaustive(async || {
3764 in_send.send_many_unordered([1, 2, 3]);
3765
3766 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3767 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3768 }
3769 });
3770 }
3771
3772 #[cfg(feature = "sim")]
3773 #[test]
3774 fn sim_batch_unordered_shuffles_count() {
3775 let mut flow = FlowBuilder::new();
3776 let node = flow.process::<()>();
3777
3778 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3779
3780 let tick = node.tick();
3781 let batch = input.batch(&tick, nondet!());
3782 let out_recv = batch.all_ticks().sim_output();
3783
3784 let instance_count = flow.sim().exhaustive(async || {
3785 in_send.send_many_unordered([1, 2, 3, 4]);
3786 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3787 });
3788
3789 assert_eq!(
3790 instance_count,
3791 75 )
3793 }
3794
3795 #[cfg(feature = "sim")]
3796 #[test]
3797 #[should_panic]
3798 fn sim_observe_order_batched() {
3799 let mut flow = FlowBuilder::new();
3800 let node = flow.process::<()>();
3801
3802 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3803
3804 let tick = node.tick();
3805 let batch = input.batch(&tick, nondet!());
3806 let out_recv = batch
3807 .assume_ordering::<TotalOrder>(nondet!())
3808 .all_ticks()
3809 .sim_output();
3810
3811 flow.sim().exhaustive(async || {
3812 in_send.send_many_unordered([1, 2, 3, 4]);
3813 out_recv.assert_yields_only([1, 2, 3, 4]).await; });
3815 }
3816
3817 #[cfg(feature = "sim")]
3818 #[test]
3819 fn sim_observe_order_batched_count() {
3820 let mut flow = FlowBuilder::new();
3821 let node = flow.process::<()>();
3822
3823 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3824
3825 let tick = node.tick();
3826 let batch = input.batch(&tick, nondet!());
3827 let out_recv = batch
3828 .assume_ordering::<TotalOrder>(nondet!())
3829 .all_ticks()
3830 .sim_output();
3831
3832 let instance_count = flow.sim().exhaustive(async || {
3833 in_send.send_many_unordered([1, 2, 3, 4]);
3834 let _ = out_recv.collect::<Vec<_>>().await;
3835 });
3836
3837 assert_eq!(
3838 instance_count,
3839 192 )
3841 }
3842
3843 #[cfg(feature = "sim")]
3844 #[test]
3845 fn sim_unordered_count_instance_count() {
3846 let mut flow = FlowBuilder::new();
3847 let node = flow.process::<()>();
3848
3849 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3850
3851 let tick = node.tick();
3852 let out_recv = input
3853 .count()
3854 .snapshot(&tick, nondet!())
3855 .all_ticks()
3856 .sim_output();
3857
3858 let instance_count = flow.sim().exhaustive(async || {
3859 in_send.send_many_unordered([1, 2, 3, 4]);
3860 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3861 });
3862
3863 assert_eq!(
3864 instance_count,
3865 16 )
3867 }
3868
3869 #[cfg(feature = "sim")]
3870 #[test]
3871 fn sim_top_level_assume_ordering() {
3872 let mut flow = FlowBuilder::new();
3873 let node = flow.process::<()>();
3874
3875 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3876
3877 let out_recv = input
3878 .assume_ordering::<TotalOrder>(nondet!())
3879 .sim_output();
3880
3881 let instance_count = flow.sim().exhaustive(async || {
3882 in_send.send_many_unordered([1, 2, 3]);
3883 let mut out = out_recv.collect::<Vec<_>>().await;
3884 out.sort();
3885 assert_eq!(out, vec![1, 2, 3]);
3886 });
3887
3888 assert_eq!(instance_count, 6)
3889 }
3890
3891 #[cfg(feature = "sim")]
3892 #[test]
3893 fn sim_top_level_assume_ordering_cycle_back() {
3894 let mut flow = FlowBuilder::new();
3895 let node = flow.process::<()>();
3896 let node2 = flow.process::<()>();
3897
3898 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3899
3900 let (complete_cycle_back, cycle_back) =
3901 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3902 let ordered = input
3903 .merge_unordered(cycle_back)
3904 .assume_ordering::<TotalOrder>(nondet!());
3905 complete_cycle_back.complete(
3906 ordered
3907 .clone()
3908 .map(q!(|v| v + 1))
3909 .filter(q!(|v| v % 2 == 1))
3910 .send(&node2, TCP.fail_stop().bincode())
3911 .send(&node, TCP.fail_stop().bincode()),
3912 );
3913
3914 let out_recv = ordered.sim_output();
3915
3916 let mut saw = false;
3917 let instance_count = flow.sim().exhaustive(async || {
3918 in_send.send_many_unordered([0, 2]);
3919 let out = out_recv.collect::<Vec<_>>().await;
3920
3921 if out.starts_with(&[0, 1, 2]) {
3922 saw = true;
3923 }
3924 });
3925
3926 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3927 assert_eq!(instance_count, 6);
3928 }
3929
3930 #[cfg(feature = "sim")]
3931 #[test]
3932 fn sim_top_level_assume_ordering_cycle_back_tick() {
3933 let mut flow = FlowBuilder::new();
3934 let node = flow.process::<()>();
3935 let node2 = flow.process::<()>();
3936
3937 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3938
3939 let (complete_cycle_back, cycle_back) =
3940 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3941 let ordered = input
3942 .merge_unordered(cycle_back)
3943 .assume_ordering::<TotalOrder>(nondet!());
3944 complete_cycle_back.complete(
3945 ordered
3946 .clone()
3947 .batch(&node.tick(), nondet!())
3948 .all_ticks()
3949 .map(q!(|v| v + 1))
3950 .filter(q!(|v| v % 2 == 1))
3951 .send(&node2, TCP.fail_stop().bincode())
3952 .send(&node, TCP.fail_stop().bincode()),
3953 );
3954
3955 let out_recv = ordered.sim_output();
3956
3957 let mut saw = false;
3958 let instance_count = flow.sim().exhaustive(async || {
3959 in_send.send_many_unordered([0, 2]);
3960 let out = out_recv.collect::<Vec<_>>().await;
3961
3962 if out.starts_with(&[0, 1, 2]) {
3963 saw = true;
3964 }
3965 });
3966
3967 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3968 assert_eq!(instance_count, 58);
3969 }
3970
3971 #[cfg(feature = "sim")]
3972 #[test]
3973 fn sim_top_level_assume_ordering_multiple() {
3974 let mut flow = FlowBuilder::new();
3975 let node = flow.process::<()>();
3976 let node2 = flow.process::<()>();
3977
3978 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3979 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3980
3981 let (complete_cycle_back, cycle_back) =
3982 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3983 let input1_ordered = input
3984 .clone()
3985 .merge_unordered(cycle_back)
3986 .assume_ordering::<TotalOrder>(nondet!());
3987 let foo = input1_ordered
3988 .clone()
3989 .map(q!(|v| v + 3))
3990 .weaken_ordering::<NoOrder>()
3991 .merge_unordered(input2)
3992 .assume_ordering::<TotalOrder>(nondet!());
3993
3994 complete_cycle_back.complete(
3995 foo.filter(q!(|v| *v == 3))
3996 .send(&node2, TCP.fail_stop().bincode())
3997 .send(&node, TCP.fail_stop().bincode()),
3998 );
3999
4000 let out_recv = input1_ordered.sim_output();
4001
4002 let mut saw = false;
4003 let instance_count = flow.sim().exhaustive(async || {
4004 in_send.send_many_unordered([0, 1]);
4005 let out = out_recv.collect::<Vec<_>>().await;
4006
4007 if out.starts_with(&[0, 3, 1]) {
4008 saw = true;
4009 }
4010 });
4011
4012 assert!(saw, "did not see an instance with 0, 3, 1 in order");
4013 assert_eq!(instance_count, 24);
4014 }
4015
4016 #[cfg(feature = "sim")]
4017 #[test]
4018 fn sim_atomic_assume_ordering_cycle_back() {
4019 let mut flow = FlowBuilder::new();
4020 let node = flow.process::<()>();
4021 let node2 = flow.process::<()>();
4022
4023 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4024
4025 let (complete_cycle_back, cycle_back) =
4026 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4027 let ordered = input
4028 .merge_unordered(cycle_back)
4029 .atomic()
4030 .assume_ordering::<TotalOrder>(nondet!())
4031 .end_atomic();
4032 complete_cycle_back.complete(
4033 ordered
4034 .clone()
4035 .map(q!(|v| v + 1))
4036 .filter(q!(|v| v % 2 == 1))
4037 .send(&node2, TCP.fail_stop().bincode())
4038 .send(&node, TCP.fail_stop().bincode()),
4039 );
4040
4041 let out_recv = ordered.sim_output();
4042
4043 let instance_count = flow.sim().exhaustive(async || {
4044 in_send.send_many_unordered([0, 2]);
4045 let out = out_recv.collect::<Vec<_>>().await;
4046 assert_eq!(out.len(), 4);
4047 });
4048 assert_eq!(instance_count, 22);
4049 }
4050
4051 #[cfg(feature = "deploy")]
4052 #[tokio::test]
4053 async fn partition_evens_odds() {
4054 let mut deployment = Deployment::new();
4055
4056 let mut flow = FlowBuilder::new();
4057 let node = flow.process::<()>();
4058 let external = flow.external::<()>();
4059
4060 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4061 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4062 let evens_port = evens.send_bincode_external(&external);
4063 let odds_port = odds.send_bincode_external(&external);
4064
4065 let nodes = flow
4066 .with_process(&node, deployment.Localhost())
4067 .with_external(&external, deployment.Localhost())
4068 .deploy(&mut deployment);
4069
4070 deployment.deploy().await.unwrap();
4071
4072 let mut evens_out = nodes.connect(evens_port).await;
4073 let mut odds_out = nodes.connect(odds_port).await;
4074
4075 deployment.start().await.unwrap();
4076
4077 let mut even_results = Vec::new();
4078 for _ in 0..3 {
4079 even_results.push(evens_out.next().await.unwrap());
4080 }
4081 even_results.sort();
4082 assert_eq!(even_results, vec![2, 4, 6]);
4083
4084 let mut odd_results = Vec::new();
4085 for _ in 0..3 {
4086 odd_results.push(odds_out.next().await.unwrap());
4087 }
4088 odd_results.sort();
4089 assert_eq!(odd_results, vec![1, 3, 5]);
4090 }
4091
4092 #[cfg(feature = "deploy")]
4093 #[tokio::test]
4094 async fn unconsumed_inspect_still_runs() {
4095 use crate::deploy::DeployCrateWrapper;
4096
4097 let mut deployment = Deployment::new();
4098
4099 let mut flow = FlowBuilder::new();
4100 let node = flow.process::<()>();
4101
4102 node.source_iter(q!(0..5))
4105 .inspect(q!(|x| println!("inspect: {}", x)));
4106
4107 let nodes = flow
4108 .with_process(&node, deployment.Localhost())
4109 .deploy(&mut deployment);
4110
4111 deployment.deploy().await.unwrap();
4112
4113 let mut stdout = nodes.get_process(&node).stdout();
4114
4115 deployment.start().await.unwrap();
4116
4117 let mut lines = Vec::new();
4118 for _ in 0..5 {
4119 lines.push(stdout.recv().await.unwrap());
4120 }
4121 lines.sort();
4122 assert_eq!(
4123 lines,
4124 vec![
4125 "inspect: 0",
4126 "inspect: 1",
4127 "inspect: 2",
4128 "inspect: 3",
4129 "inspect: 4",
4130 ]
4131 );
4132 }
4133
4134 #[cfg(feature = "sim")]
4135 #[test]
4136 fn sim_limit() {
4137 let mut flow = FlowBuilder::new();
4138 let node = flow.process::<()>();
4139
4140 let (in_send, input) = node.sim_input();
4141
4142 let out_recv = input.limit(q!(3)).sim_output();
4143
4144 flow.sim().exhaustive(async || {
4145 in_send.send(1);
4146 in_send.send(2);
4147 in_send.send(3);
4148 in_send.send(4);
4149 in_send.send(5);
4150
4151 out_recv.assert_yields_only([1, 2, 3]).await;
4152 });
4153 }
4154
4155 #[cfg(feature = "sim")]
4156 #[test]
4157 fn sim_limit_zero() {
4158 let mut flow = FlowBuilder::new();
4159 let node = flow.process::<()>();
4160
4161 let (in_send, input) = node.sim_input();
4162
4163 let out_recv = input.limit(q!(0)).sim_output();
4164
4165 flow.sim().exhaustive(async || {
4166 in_send.send(1);
4167 in_send.send(2);
4168
4169 out_recv.assert_yields_only::<i32, _>([]).await;
4170 });
4171 }
4172
4173 #[cfg(feature = "sim")]
4174 #[test]
4175 fn sim_merge_ordered() {
4176 let mut flow = FlowBuilder::new();
4177 let node = flow.process::<()>();
4178
4179 let (in_send, input) = node.sim_input();
4180 let (in_send2, input2) = node.sim_input();
4181
4182 let out_recv = input
4183 .merge_ordered(input2, nondet!())
4184 .sim_output();
4185
4186 let mut saw_out_of_order = false;
4187 let instances = flow.sim().exhaustive(async || {
4188 in_send.send(1);
4189 in_send.send(2);
4190 in_send2.send(3);
4191 in_send2.send(4);
4192
4193 let out = out_recv.collect::<Vec<_>>().await;
4194
4195 if out == [1, 3, 2, 4] {
4196 saw_out_of_order = true;
4197 }
4198
4199 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4202 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4203 assert_eq!(
4204 first_elements,
4205 vec![1, 2],
4206 "first input order violated: {:?}",
4207 out
4208 );
4209 assert_eq!(
4210 second_elements,
4211 vec![3, 4],
4212 "second input order violated: {:?}",
4213 out
4214 );
4215
4216 first_elements.append(&mut second_elements);
4217 first_elements.sort();
4218 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4219 });
4220
4221 assert!(saw_out_of_order);
4222 assert_eq!(instances, 6);
4223 }
4224
4225 #[cfg(feature = "sim")]
4228 #[test]
4229 fn sim_merge_ordered_one_empty() {
4230 let mut flow = FlowBuilder::new();
4231 let node = flow.process::<()>();
4232
4233 let (in_send, input) = node.sim_input();
4234 let (_in_send2, input2) = node.sim_input();
4235
4236 let out_recv = input
4237 .merge_ordered(input2, nondet!())
4238 .sim_output();
4239
4240 let instances = flow.sim().exhaustive(async || {
4241 in_send.send(1);
4242 in_send.send(2);
4243
4244 let out = out_recv.collect::<Vec<_>>().await;
4245 assert_eq!(out, vec![1, 2]);
4246 });
4247
4248 assert_eq!(instances, 1);
4250 }
4251
4252 #[cfg(feature = "sim")]
4258 #[test]
4259 fn sim_merge_ordered_cycle_back() {
4260 let mut flow = FlowBuilder::new();
4261 let node = flow.process::<()>();
4262
4263 let (in_send, input) = node.sim_input();
4264
4265 let (complete_cycle_back, cycle_back) =
4267 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4268
4269 let merged = input.merge_ordered(cycle_back, nondet!());
4271
4272 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4274
4275 let out_recv = merged.sim_output();
4276
4277 let mut saw_cycle_before_second = false;
4280 flow.sim().exhaustive(async || {
4281 in_send.send(1);
4282 in_send.send(2);
4283
4284 let out = out_recv.collect::<Vec<_>>().await;
4285
4286 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4288 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4289 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4290
4291 if out == [1, 10, 2] {
4293 saw_cycle_before_second = true;
4294 }
4295
4296 let mut sorted = out;
4297 sorted.sort();
4298 assert_eq!(sorted, vec![1, 2, 10]);
4299 });
4300
4301 assert!(
4302 saw_cycle_before_second,
4303 "never saw the cycled element arrive before the second input element"
4304 );
4305 }
4306
4307 #[cfg(feature = "sim")]
4311 #[test]
4312 fn sim_merge_ordered_delayed() {
4313 let mut flow = FlowBuilder::new();
4314 let node = flow.process::<()>();
4315
4316 let (in_send, input) = node.sim_input();
4317 let (in_send2, input2) = node.sim_input();
4318
4319 let out_recv = input
4320 .merge_ordered(input2, nondet!())
4321 .sim_output();
4322
4323 let mut saw_delayed_interleaving = false;
4324 flow.sim().exhaustive(async || {
4325 in_send.send(1);
4327 in_send2.send(3);
4328 in_send2.send(4);
4329
4330 let first_batch = out_recv.collect::<Vec<_>>().await;
4332
4333 in_send.send(2);
4335 let second_batch = out_recv.collect::<Vec<_>>().await;
4336
4337 let mut all: Vec<_> = first_batch
4338 .iter()
4339 .chain(second_batch.iter())
4340 .copied()
4341 .collect();
4342
4343 if all == [1, 3, 4, 2] {
4345 saw_delayed_interleaving = true;
4346 }
4347
4348 all.sort();
4349 assert_eq!(all, vec![1, 2, 3, 4]);
4350 });
4351
4352 assert!(saw_delayed_interleaving);
4353 }
4354
4355 #[cfg(feature = "deploy")]
4360 #[tokio::test]
4361 async fn deploy_merge_ordered_delayed() {
4362 let mut deployment = Deployment::new();
4363
4364 let mut flow = FlowBuilder::new();
4365 let node = flow.process::<()>();
4366 let external = flow.external::<()>();
4367
4368 let (input_a_port, input_a) = node.source_external_bincode(&external);
4369 let (input_b_port, input_b) = node.source_external_bincode(&external);
4370
4371 let out = input_a
4372 .assume_ordering(nondet!())
4373 .merge_ordered(
4374 input_b.assume_ordering(nondet!()),
4375 nondet!(),
4376 )
4377 .send_bincode_external(&external);
4378
4379 let nodes = flow
4380 .with_process(&node, deployment.Localhost())
4381 .with_external(&external, deployment.Localhost())
4382 .deploy(&mut deployment);
4383
4384 deployment.deploy().await.unwrap();
4385
4386 let mut ext_a = nodes.connect(input_a_port).await;
4387 let mut ext_b = nodes.connect(input_b_port).await;
4388 let mut ext_out = nodes.connect(out).await;
4389
4390 deployment.start().await.unwrap();
4391
4392 ext_a.send(1).await.unwrap();
4394 ext_b.send(3).await.unwrap();
4395 ext_b.send(4).await.unwrap();
4396
4397 let mut received = Vec::new();
4399 for _ in 0..3 {
4400 received.push(ext_out.next().await.unwrap());
4401 }
4402
4403 ext_a.send(2).await.unwrap();
4405 received.push(ext_out.next().await.unwrap());
4406
4407 received.sort();
4409 assert_eq!(received, vec![1, 2, 3, 4]);
4410 }
4411
4412 #[cfg(feature = "deploy")]
4413 #[tokio::test]
4414 async fn monotone_fold_threshold() {
4415 use crate::properties::manual_proof;
4416
4417 let mut deployment = Deployment::new();
4418
4419 let mut flow = FlowBuilder::new();
4420 let node = flow.process::<()>();
4421 let external = flow.external::<()>();
4422
4423 let in_unbounded: super::Stream<_, _> =
4424 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4425 let sum = in_unbounded.fold(
4426 q!(|| 0),
4427 q!(
4428 |sum, v| {
4429 *sum += v;
4430 },
4431 monotone = manual_proof!()
4432 ),
4433 );
4434
4435 let threshold_out = sum
4436 .threshold_greater_or_equal(node.singleton(q!(7)))
4437 .send_bincode_external(&external);
4438
4439 let nodes = flow
4440 .with_process(&node, deployment.Localhost())
4441 .with_external(&external, deployment.Localhost())
4442 .deploy(&mut deployment);
4443
4444 deployment.deploy().await.unwrap();
4445
4446 let mut threshold_out = nodes.connect(threshold_out).await;
4447
4448 deployment.start().await.unwrap();
4449
4450 assert_eq!(threshold_out.next().await.unwrap(), 7);
4451 }
4452
4453 #[cfg(feature = "deploy")]
4454 #[tokio::test]
4455 async fn monotone_count_threshold() {
4456 let mut deployment = Deployment::new();
4457
4458 let mut flow = FlowBuilder::new();
4459 let node = flow.process::<()>();
4460 let external = flow.external::<()>();
4461
4462 let in_unbounded: super::Stream<_, _> =
4463 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4464 let sum = in_unbounded.count();
4465
4466 let threshold_out = sum
4467 .threshold_greater_or_equal(node.singleton(q!(3)))
4468 .send_bincode_external(&external);
4469
4470 let nodes = flow
4471 .with_process(&node, deployment.Localhost())
4472 .with_external(&external, deployment.Localhost())
4473 .deploy(&mut deployment);
4474
4475 deployment.deploy().await.unwrap();
4476
4477 let mut threshold_out = nodes.connect(threshold_out).await;
4478
4479 deployment.start().await.unwrap();
4480
4481 assert_eq!(threshold_out.next().await.unwrap(), 3);
4482 }
4483
4484 #[cfg(feature = "deploy")]
4485 #[tokio::test]
4486 async fn monotone_map_order_preserving_threshold() {
4487 use crate::properties::manual_proof;
4488
4489 let mut deployment = Deployment::new();
4490
4491 let mut flow = FlowBuilder::new();
4492 let node = flow.process::<()>();
4493 let external = flow.external::<()>();
4494
4495 let in_unbounded: super::Stream<_, _> =
4496 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4497 let sum = in_unbounded.fold(
4498 q!(|| 0),
4499 q!(
4500 |sum, v| {
4501 *sum += v;
4502 },
4503 monotone = manual_proof!()
4504 ),
4505 );
4506
4507 let doubled = sum.map(q!(
4509 |v| v * 2,
4510 order_preserving = manual_proof!()
4511 ));
4512
4513 let threshold_out = doubled
4514 .threshold_greater_or_equal(node.singleton(q!(14)))
4515 .send_bincode_external(&external);
4516
4517 let nodes = flow
4518 .with_process(&node, deployment.Localhost())
4519 .with_external(&external, deployment.Localhost())
4520 .deploy(&mut deployment);
4521
4522 deployment.deploy().await.unwrap();
4523
4524 let mut threshold_out = nodes.connect(threshold_out).await;
4525
4526 deployment.start().await.unwrap();
4527
4528 assert_eq!(threshold_out.next().await.unwrap(), 14);
4529 }
4530
4531 #[cfg(any(feature = "deploy", feature = "sim"))]
4534 mod join_ordering_type_tests {
4535 use crate::live_collections::boundedness::{Bounded, Unbounded};
4536 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4537 use crate::location::{Location, Process};
4538
4539 #[expect(dead_code, reason = "compile-time type test")]
4540 fn join_unbounded_with_bounded_preserves_order<'a>(
4541 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4542 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4543 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4544 left.join(right)
4545 }
4546
4547 #[expect(dead_code, reason = "compile-time type test")]
4548 fn join_unbounded_with_unbounded_is_no_order<'a>(
4549 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4550 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4551 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4552 left.join(right)
4553 }
4554
4555 #[expect(dead_code, reason = "compile-time type test")]
4556 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4557 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4558 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4559 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4560 left.join(right)
4561 }
4562
4563 #[expect(dead_code, reason = "compile-time type test")]
4564 fn join_unbounded_noorder_with_bounded<'a>(
4565 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4566 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4567 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4568 left.join(right)
4569 }
4570
4571 #[expect(dead_code, reason = "compile-time type test")]
4574 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4575 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4576 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4577 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4578 left.cross_product(right)
4579 }
4580
4581 #[expect(dead_code, reason = "compile-time type test")]
4582 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4583 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4584 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4585 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4586 left.cross_product(right)
4587 }
4588
4589 #[expect(dead_code, reason = "compile-time type test")]
4590 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4591 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4592 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4593 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4594 left.cross_product(right)
4595 }
4596 } #[cfg(feature = "sim")]
4601 #[test]
4602 fn cross_product_mixed_boundedness_correctness() {
4603 use stageleft::q;
4604
4605 use crate::compile::builder::FlowBuilder;
4606 use crate::nondet::nondet;
4607
4608 let mut flow = FlowBuilder::new();
4609 let process = flow.process::<()>();
4610 let tick = process.tick();
4611
4612 let left = process.source_iter(q!(vec![1, 2]));
4613 let right = process
4614 .source_iter(q!(vec!['a', 'b']))
4615 .batch(&tick, nondet!())
4616 .all_ticks();
4617
4618 let out = left.cross_product(right).sim_output();
4619
4620 flow.sim().exhaustive(async || {
4621 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4622 .await;
4623 });
4624 }
4625
4626 #[cfg(feature = "sim")]
4627 #[test]
4628 fn join_mixed_boundedness_correctness() {
4629 use stageleft::q;
4630
4631 use crate::compile::builder::FlowBuilder;
4632 use crate::nondet::nondet;
4633
4634 let mut flow = FlowBuilder::new();
4635 let process = flow.process::<()>();
4636 let tick = process.tick();
4637
4638 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4639 let right = process
4640 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4641 .batch(&tick, nondet!())
4642 .all_ticks();
4643
4644 let out = left.join(right).sim_output();
4645
4646 flow.sim().exhaustive(async || {
4647 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4648 .await;
4649 });
4650 }
4651
4652 #[cfg(feature = "sim")]
4653 #[test]
4654 fn sim_merge_unordered_independent_atomics() {
4655 let mut flow = FlowBuilder::new();
4656 let node = flow.process::<()>();
4657
4658 let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4659 let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4660
4661 let out = input1
4662 .atomic()
4663 .merge_unordered(input2.atomic())
4664 .end_atomic()
4665 .sim_output();
4666
4667 flow.sim().exhaustive(async || {
4668 in1_send.send(1);
4669 in2_send.send(2);
4670
4671 out.assert_yields_only_unordered(vec![1, 2]).await;
4672 });
4673 }
4674
4675 #[cfg(feature = "deploy")]
4676 #[tokio::test]
4677 async fn test_stream_ref() {
4678 let mut deployment = Deployment::new();
4679
4680 let mut flow = FlowBuilder::new();
4681 let external = flow.external::<()>();
4682 let p1 = flow.process::<()>();
4683
4684 let my_stream = p1.source_iter(q!(1..=5i32));
4686
4687 let stream_ref = my_stream.by_ref();
4688
4689 let out_port = p1
4691 .source_iter(q!([()]))
4692 .map(q!(|_| stream_ref.len() as i32))
4693 .send_bincode_external(&external);
4694
4695 my_stream.for_each(q!(|_| {}));
4697
4698 let nodes = flow
4699 .with_default_optimize()
4700 .with_process(&p1, deployment.Localhost())
4701 .with_external(&external, deployment.Localhost())
4702 .deploy(&mut deployment);
4703
4704 deployment.deploy().await.unwrap();
4705
4706 let mut out_recv = nodes.connect(out_port).await;
4707
4708 deployment.start().await.unwrap();
4709
4710 let result = out_recv.next().await.unwrap();
4711 assert_eq!(result, 5);
4713 }
4714
4715 #[cfg(feature = "deploy")]
4716 #[tokio::test]
4717 async fn test_stream_ref_contents() {
4718 let mut deployment = Deployment::new();
4719
4720 let mut flow = FlowBuilder::new();
4721 let external = flow.external::<()>();
4722 let p1 = flow.process::<()>();
4723
4724 let my_stream = p1.source_iter(q!(1..=3i32));
4726
4727 let stream_ref = my_stream.by_ref();
4728
4729 let out_port = p1
4731 .source_iter(q!([()]))
4732 .map(q!(|_| stream_ref.iter().sum::<i32>()))
4733 .send_bincode_external(&external);
4734
4735 my_stream.for_each(q!(|_| {}));
4736
4737 let nodes = flow
4738 .with_default_optimize()
4739 .with_process(&p1, deployment.Localhost())
4740 .with_external(&external, deployment.Localhost())
4741 .deploy(&mut deployment);
4742
4743 deployment.deploy().await.unwrap();
4744
4745 let mut out_recv = nodes.connect(out_port).await;
4746
4747 deployment.start().await.unwrap();
4748
4749 let result = out_recv.next().await.unwrap();
4750 assert_eq!(result, 6);
4752 }
4753
4754 #[cfg(feature = "deploy")]
4755 #[tokio::test]
4756 async fn test_stream_ref_no_consumer() {
4757 let mut deployment = Deployment::new();
4758
4759 let mut flow = FlowBuilder::new();
4760 let external = flow.external::<()>();
4761 let p1 = flow.process::<()>();
4762
4763 let my_stream = p1.source_iter(q!(1..=4i32));
4765
4766 let stream_ref = my_stream.by_ref();
4767
4768 let out_port = p1
4769 .source_iter(q!([()]))
4770 .map(q!(|_| stream_ref.len() as i32))
4771 .send_bincode_external(&external);
4772
4773 let nodes = flow
4774 .with_default_optimize()
4775 .with_process(&p1, deployment.Localhost())
4776 .with_external(&external, deployment.Localhost())
4777 .deploy(&mut deployment);
4778
4779 deployment.deploy().await.unwrap();
4780
4781 let mut out_recv = nodes.connect(out_port).await;
4782
4783 deployment.start().await.unwrap();
4784
4785 let result = out_recv.next().await.unwrap();
4786 assert_eq!(result, 4);
4787 }
4788
4789 #[cfg(feature = "deploy")]
4790 #[tokio::test]
4791 async fn test_stream_mut() {
4792 let mut deployment = Deployment::new();
4793
4794 let mut flow = FlowBuilder::new();
4795 let external = flow.external::<()>();
4796 let p1 = flow.process::<()>();
4797
4798 let my_stream = p1.source_iter(q!(1..=5i32));
4800
4801 let stream_mut = my_stream.by_mut();
4802
4803 let out_port = p1
4805 .source_iter(q!([()]))
4806 .map(q!(|_| {
4807 stream_mut.retain(|x| *x > 3);
4808 stream_mut.len() as i32
4809 }))
4810 .send_bincode_external(&external);
4811
4812 my_stream.for_each(q!(|_| {}));
4813
4814 let nodes = flow
4815 .with_default_optimize()
4816 .with_process(&p1, deployment.Localhost())
4817 .with_external(&external, deployment.Localhost())
4818 .deploy(&mut deployment);
4819
4820 deployment.deploy().await.unwrap();
4821
4822 let mut out_recv = nodes.connect(out_port).await;
4823
4824 deployment.start().await.unwrap();
4825
4826 let result = out_recv.next().await.unwrap();
4827 assert_eq!(result, 2);
4829 }
4830}