Skip to main content

hydro_lang/live_collections/stream/
networking.rs

1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19use crate::location::cluster::{ClusterIds, Consistency, EventualConsistency, NoConsistency};
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::DynLocation;
22use crate::location::external_process::ExternalBincodeStream;
23use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, Process};
24use crate::networking::{NetworkFor, TCP};
25use crate::nondet::{NonDet, nondet};
26use crate::properties::manual_proof;
27#[cfg(feature = "sim")]
28use crate::sim::SimReceiver;
29use crate::staging_util::get_this_crate;
30
31// same as the one in `hydro_std`, but internal use only
32fn track_membership<'a, C, L: Location<'a>>(
33    membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
34) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
35    membership.fold(
36        q!(|| false),
37        q!(|present, event| {
38            match event {
39                MembershipEvent::Joined => *present = true,
40                MembershipEvent::Left => *present = false,
41            }
42        }),
43    )
44}
45
46fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
47    let root = get_this_crate();
48
49    if is_demux {
50        parse_quote! {
51            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
52                |(id, data)| {
53                    (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
54                }
55            )
56        }
57    } else {
58        parse_quote! {
59            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
60                |data| {
61                    #root::runtime_support::bincode::serialize(&data).unwrap().into()
62                }
63            )
64        }
65    }
66}
67
68pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
69    serialize_bincode_with_type(is_demux, &quote_type::<T>())
70}
71
72fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
73    let root = get_this_crate();
74    if let Some(c_type) = tagged {
75        parse_quote! {
76            |res| {
77                let (id, b) = res.unwrap();
78                (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
79            }
80        }
81    } else {
82        parse_quote! {
83            |res| {
84                #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
85            }
86        }
87    }
88}
89
90pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
91    deserialize_bincode_with_type(tagged, &quote_type::<T>())
92}
93
94impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
95    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
96    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
97    /// using [`bincode`] to serialize/deserialize messages.
98    ///
99    /// The returned stream captures the elements received at the destination, where values will
100    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
101    /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
102    /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
103    /// dropped no further messages will be sent.
104    ///
105    /// # Example
106    /// ```rust
107    /// # #[cfg(feature = "deploy")] {
108    /// # use hydro_lang::prelude::*;
109    /// # use futures::StreamExt;
110    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
111    /// let p1 = flow.process::<()>();
112    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
113    /// let p2 = flow.process::<()>();
114    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
115    /// // 1, 2, 3
116    /// # on_p2.send_bincode(&p_out)
117    /// # }, |mut stream| async move {
118    /// # for w in 1..=3 {
119    /// #     assert_eq!(stream.next().await, Some(w));
120    /// # }
121    /// # }));
122    /// # }
123    /// ```
124    pub fn send_bincode<L2>(
125        self,
126        other: &Process<'a, L2>,
127    ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
128    where
129        T: Serialize + DeserializeOwned,
130    {
131        self.send(other, TCP.fail_stop().bincode())
132    }
133
134    /// "Moves" elements of this stream to a new distributed location by sending them over the network,
135    /// using the configuration in `via` to set up the message transport.
136    ///
137    /// The returned stream captures the elements received at the destination, where values will
138    /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
139    /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
140    /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
141    /// dropped no further messages will be sent.
142    ///
143    /// # Example
144    /// ```rust
145    /// # #[cfg(feature = "deploy")] {
146    /// # use hydro_lang::prelude::*;
147    /// # use futures::StreamExt;
148    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
149    /// let p1 = flow.process::<()>();
150    /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
151    /// let p2 = flow.process::<()>();
152    /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
153    /// // 1, 2, 3
154    /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
155    /// # }, |mut stream| async move {
156    /// # for w in 1..=3 {
157    /// #     assert_eq!(stream.next().await, Some(w));
158    /// # }
159    /// # }));
160    /// # }
161    /// ```
162    pub fn send<L2, N: NetworkFor<T>>(
163        self,
164        to: &Process<'a, L2>,
165        via: N,
166    ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
167    where
168        T: Serialize + DeserializeOwned,
169        O: MinOrder<N::OrderingGuarantee>,
170    {
171        let serialize_pipeline = Some(N::serialize_thunk(false));
172        let deserialize_pipeline = Some(N::deserialize_thunk(None));
173
174        let name = via.name();
175        if to.multiversioned() && name.is_none() {
176            panic!(
177                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
178            );
179        }
180
181        Stream::new(
182            to.clone(),
183            HydroNode::Network {
184                name: name.map(ToOwned::to_owned),
185                networking_info: N::networking_info(),
186                serialize_fn: serialize_pipeline.map(|e| e.into()),
187                instantiate_fn: DebugInstantiate::Building,
188                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
189                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
190                metadata: to.new_node_metadata(Stream::<
191                    T,
192                    Process<'a, L2>,
193                    Unbounded,
194                    <O as MinOrder<N::OrderingGuarantee>>::Min,
195                    R,
196                >::collection_kind()),
197            },
198        )
199    }
200
201    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
202    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
203    /// using [`bincode`] to serialize/deserialize messages.
204    ///
205    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
206    /// membership information. This is a common pattern in distributed systems for broadcasting data to
207    /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
208    /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
209    /// each element to all cluster members.
210    ///
211    /// # Non-Determinism
212    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
213    /// to the current cluster members _at that point in time_. Depending on when we are notified of
214    /// membership changes, we will broadcast each element to different members.
215    ///
216    /// # Example
217    /// ```rust
218    /// # #[cfg(feature = "deploy")] {
219    /// # use hydro_lang::prelude::*;
220    /// # use futures::StreamExt;
221    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
222    /// let p1 = flow.process::<()>();
223    /// let workers: Cluster<()> = flow.cluster::<()>();
224    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
225    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
226    /// # on_worker.send_bincode(&p2).entries()
227    /// // if there are 4 members in the cluster, each receives one element
228    /// // - MemberId::<()>(0): [123]
229    /// // - MemberId::<()>(1): [123]
230    /// // - MemberId::<()>(2): [123]
231    /// // - MemberId::<()>(3): [123]
232    /// # }, |mut stream| async move {
233    /// # let mut results = Vec::new();
234    /// # for w in 0..4 {
235    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
236    /// # }
237    /// # results.sort();
238    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
239    /// # }));
240    /// # }
241    /// ```
242    pub fn broadcast_bincode<L2: 'a>(
243        self,
244        other: &Cluster<'a, L2>,
245        nondet_membership: NonDet,
246    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
247    where
248        T: Clone + Serialize + DeserializeOwned,
249    {
250        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
251    }
252
253    /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
254    /// using the configuration in `via` to set up the message transport.
255    ///
256    /// Each element in the stream will be sent to **every** member of the cluster based on the latest
257    /// membership information. This is a common pattern in distributed systems for broadcasting data to
258    /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
259    /// target specific members, `broadcast` takes a stream of **only data elements** and sends
260    /// each element to all cluster members.
261    ///
262    /// # Non-Determinism
263    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
264    /// to the current cluster members _at that point in time_. Depending on when we are notified of
265    /// membership changes, we will broadcast each element to different members.
266    ///
267    /// # Example
268    /// ```rust
269    /// # #[cfg(feature = "deploy")] {
270    /// # use hydro_lang::prelude::*;
271    /// # use futures::StreamExt;
272    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
273    /// let p1 = flow.process::<()>();
274    /// let workers: Cluster<()> = flow.cluster::<()>();
275    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
276    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
277    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
278    /// // if there are 4 members in the cluster, each receives one element
279    /// // - MemberId::<()>(0): [123]
280    /// // - MemberId::<()>(1): [123]
281    /// // - MemberId::<()>(2): [123]
282    /// // - MemberId::<()>(3): [123]
283    /// # }, |mut stream| async move {
284    /// # let mut results = Vec::new();
285    /// # for w in 0..4 {
286    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
287    /// # }
288    /// # results.sort();
289    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
290    /// # }));
291    /// # }
292    /// ```
293    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
294        self,
295        to: &Cluster<'a, L2>,
296        via: N,
297        nondet_membership: NonDet,
298    ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
299    where
300        T: Clone + Serialize + DeserializeOwned,
301        O: MinOrder<N::OrderingGuarantee>,
302    {
303        let ids = track_membership(self.location.source_cluster_membership_stream(
304            to,
305            nondet!(/** dropped prefixes don't affect broadcast */),
306        ));
307        sliced! {
308            let members_snapshot = use(ids, nondet_membership);
309            let elements = use(self, nondet_membership);
310
311            let current_members = members_snapshot.filter(q!(|b| *b));
312            elements.repeat_with_keys(current_members)
313        }
314        .demux(to, via)
315    }
316
317    #[expect(clippy::type_complexity, reason = "guarantees eventual consistency")]
318    /// Broadcasts elements of this stream to all members of a cluster,
319    /// assuming membership is closed (fixed at deploy time).
320    ///
321    /// Unlike [`Stream::broadcast`], this does not require a [`NonDet`] guard.
322    /// The membership set is obtained from deploy metadata via
323    /// [`ClusterIds`], producing a
324    /// `Bounded` stream. The cross-product of data × members is fully
325    /// deterministic.
326    ///
327    /// This is only available in deployment targets with static cluster
328    /// membership (legacy Hydro Deploy and simulation). There are no late
329    /// joiners in that context, so broadcast receivers are guaranteed to
330    /// get data from the start of the stream. On dynamic targets
331    /// (e.g. ECS), use [`Stream::broadcast`] instead.
332    ///
333    /// # Example
334    /// ```rust
335    /// # #[cfg(feature = "deploy")] {
336    /// # use hydro_lang::prelude::*;
337    /// # use futures::StreamExt;
338    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
339    /// let p1 = flow.process::<()>();
340    /// let workers: Cluster<()> = flow.cluster::<()>();
341    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
342    /// let on_worker = numbers.broadcast_closed(&workers, TCP.fail_stop().bincode());
343    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
344    /// // each of the 4 cluster members receives 123
345    /// # }, |mut stream| async move {
346    /// # let mut results = Vec::new();
347    /// # for _ in 0..4 {
348    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
349    /// # }
350    /// # results.sort();
351    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
352    /// # }));
353    /// # }
354    /// ```
355    pub fn broadcast_closed<L2: 'a, N: NetworkFor<T>>(
356        self,
357        to: &Cluster<'a, L2>,
358        via: N,
359    ) -> Stream<
360        T,
361        Cluster<'a, L2, EventualConsistency>,
362        Unbounded,
363        <O as MinOrder<N::OrderingGuarantee>>::Min,
364        R,
365    >
366    where
367        T: Clone + Serialize + DeserializeOwned,
368        O: MinOrder<N::OrderingGuarantee>,
369    {
370        let cluster_ids = ClusterIds {
371            key: to.key,
372            _phantom: PhantomData,
373        };
374        let member_ids = self.location.source_iter(q!(cluster_ids
375            .iter()
376            .map(|id| MemberId::from_tagless(id.clone()))));
377
378        // Late joiners will receive no data from this broadcast, which is
379        // future-monotone and eventually consistent (a safe under-approximation).
380        self.cross_product(member_ids.weaken_retries())
381            .map(q!(|(data, member_id)| (member_id, data)))
382            .into_keyed()
383            .demux(to, via)
384            .assert_has_consistency_of_trusted(manual_proof!(/** closed broadcast will materialze the same elements on each member */))
385    }
386
387    /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
388    /// serialization. The external process can receive these elements by establishing a TCP
389    /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
390    ///
391    /// # Example
392    /// ```rust
393    /// # #[cfg(feature = "deploy")] {
394    /// # use hydro_lang::prelude::*;
395    /// # use futures::StreamExt;
396    /// # tokio_test::block_on(async move {
397    /// let mut flow = FlowBuilder::new();
398    /// let process = flow.process::<()>();
399    /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
400    /// let external = flow.external::<()>();
401    /// let external_handle = numbers.send_bincode_external(&external);
402    ///
403    /// let mut deployment = hydro_deploy::Deployment::new();
404    /// let nodes = flow
405    ///     .with_process(&process, deployment.Localhost())
406    ///     .with_external(&external, deployment.Localhost())
407    ///     .deploy(&mut deployment);
408    ///
409    /// deployment.deploy().await.unwrap();
410    /// // establish the TCP connection
411    /// let mut external_recv_stream = nodes.connect(external_handle).await;
412    /// deployment.start().await.unwrap();
413    ///
414    /// for w in 1..=3 {
415    ///     assert_eq!(external_recv_stream.next().await, Some(w));
416    /// }
417    /// # });
418    /// # }
419    /// ```
420    pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
421    where
422        T: Serialize + DeserializeOwned,
423    {
424        let serialize_pipeline = Some(serialize_bincode::<T>(false));
425
426        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
427
428        let external_port_id = flow_state_borrow.next_external_port();
429
430        flow_state_borrow.push_root(HydroRoot::SendExternal {
431            to_external_key: other.key,
432            to_port_id: external_port_id,
433            to_many: false,
434            unpaired: true,
435            serialize_fn: serialize_pipeline.map(|e| e.into()),
436            instantiate_fn: DebugInstantiate::Building,
437            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
438            op_metadata: HydroIrOpMetadata::new(),
439        });
440
441        ExternalBincodeStream {
442            process_key: other.key,
443            port_id: external_port_id,
444            _phantom: PhantomData,
445        }
446    }
447
448    #[cfg(feature = "sim")]
449    /// Sets up a simulation output port for this stream, allowing test code to receive elements
450    /// sent to this stream during simulation.
451    pub fn sim_output(self) -> SimReceiver<T, O, R>
452    where
453        T: Serialize + DeserializeOwned,
454    {
455        let external_location: External<'a, ()> = External {
456            key: LocationKey::FIRST,
457            flow_state: self.location.flow_state().clone(),
458            _phantom: PhantomData,
459        };
460
461        let external = self.send_bincode_external(&external_location);
462
463        SimReceiver(external.port_id, PhantomData)
464    }
465}
466
467impl<'a, T, L: Location<'a>, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
468    /// Creates an external output for embedded deployment mode.
469    ///
470    /// The `name` parameter specifies the name of the field in the generated
471    /// `EmbeddedOutputs` struct that will receive elements from this stream.
472    /// The generated function will accept an `EmbeddedOutputs` struct with an
473    /// `impl FnMut(T)` field with this name.
474    pub fn embedded_output(self, name: impl Into<String>) {
475        let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
476
477        self.location
478            .flow_state()
479            .borrow_mut()
480            .push_root(HydroRoot::EmbeddedOutput {
481                ident,
482                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
483                op_metadata: HydroIrOpMetadata::new(),
484            });
485    }
486}
487
488impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
489    Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
490{
491    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
492    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
493    /// using [`bincode`] to serialize/deserialize messages.
494    ///
495    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
496    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
497    /// this API allows precise targeting of specific cluster members rather than broadcasting to
498    /// all members.
499    ///
500    /// # Example
501    /// ```rust
502    /// # #[cfg(feature = "deploy")] {
503    /// # use hydro_lang::prelude::*;
504    /// # use futures::StreamExt;
505    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
506    /// let p1 = flow.process::<()>();
507    /// let workers: Cluster<()> = flow.cluster::<()>();
508    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
509    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
510    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
511    ///     .demux_bincode(&workers);
512    /// # on_worker.send_bincode(&p2).entries()
513    /// // if there are 4 members in the cluster, each receives one element
514    /// // - MemberId::<()>(0): [0]
515    /// // - MemberId::<()>(1): [1]
516    /// // - MemberId::<()>(2): [2]
517    /// // - MemberId::<()>(3): [3]
518    /// # }, |mut stream| async move {
519    /// # let mut results = Vec::new();
520    /// # for w in 0..4 {
521    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
522    /// # }
523    /// # results.sort();
524    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
525    /// # }));
526    /// # }
527    /// ```
528    pub fn demux_bincode(
529        self,
530        other: &Cluster<'a, L2>,
531    ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
532    where
533        T: Serialize + DeserializeOwned,
534    {
535        self.demux(other, TCP.fail_stop().bincode())
536    }
537
538    /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
539    /// using the configuration in `via` to set up the message transport.
540    ///
541    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
542    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
543    /// this API allows precise targeting of specific cluster members rather than broadcasting to
544    /// all members.
545    ///
546    /// # Example
547    /// ```rust
548    /// # #[cfg(feature = "deploy")] {
549    /// # use hydro_lang::prelude::*;
550    /// # use futures::StreamExt;
551    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
552    /// let p1 = flow.process::<()>();
553    /// let workers: Cluster<()> = flow.cluster::<()>();
554    /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
555    /// let on_worker: Stream<_, Cluster<_>, _> = numbers
556    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
557    ///     .demux(&workers, TCP.fail_stop().bincode());
558    /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
559    /// // if there are 4 members in the cluster, each receives one element
560    /// // - MemberId::<()>(0): [0]
561    /// // - MemberId::<()>(1): [1]
562    /// // - MemberId::<()>(2): [2]
563    /// // - MemberId::<()>(3): [3]
564    /// # }, |mut stream| async move {
565    /// # let mut results = Vec::new();
566    /// # for w in 0..4 {
567    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
568    /// # }
569    /// # results.sort();
570    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
571    /// # }));
572    /// # }
573    /// ```
574    #[expect(clippy::type_complexity, reason = "DropConsistency type")]
575    pub fn demux<N: NetworkFor<T>>(
576        self,
577        to: &Cluster<'a, L2>,
578        via: N,
579    ) -> Stream<
580        T,
581        Cluster<'a, L2, NoConsistency>,
582        Unbounded,
583        <O as MinOrder<N::OrderingGuarantee>>::Min,
584        R,
585    >
586    where
587        T: Serialize + DeserializeOwned,
588        O: MinOrder<N::OrderingGuarantee>,
589    {
590        self.into_keyed().demux(to, via)
591    }
592}
593
594impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
595    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
596    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
597    /// [`bincode`] to serialize/deserialize messages.
598    ///
599    /// This provides load balancing by evenly distributing work across cluster members. The
600    /// distribution is deterministic based on element order - the first element goes to member 0,
601    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
602    ///
603    /// # Non-Determinism
604    /// The set of cluster members may asynchronously change over time. Each element is distributed
605    /// based on the current cluster membership _at that point in time_. Depending on when cluster
606    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
607    /// membership is stable, the order of members in the round-robin pattern may change across runs.
608    ///
609    /// # Ordering Requirements
610    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
611    /// order of messages and retries affects the round-robin pattern.
612    ///
613    /// # Example
614    /// ```rust
615    /// # #[cfg(feature = "deploy")] {
616    /// # use hydro_lang::prelude::*;
617    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
618    /// # use futures::StreamExt;
619    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
620    /// let p1 = flow.process::<()>();
621    /// let workers: Cluster<()> = flow.cluster::<()>();
622    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
623    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
624    /// on_worker.send_bincode(&p2)
625    /// # .first().values() // we use first to assert that each member gets one element
626    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
627    /// // - MemberId::<()>(?): [1]
628    /// // - MemberId::<()>(?): [2]
629    /// // - MemberId::<()>(?): [3]
630    /// // - MemberId::<()>(?): [4]
631    /// # }, |mut stream| async move {
632    /// # let mut results = Vec::new();
633    /// # for w in 0..4 {
634    /// #     results.push(stream.next().await.unwrap());
635    /// # }
636    /// # results.sort();
637    /// # assert_eq!(results, vec![1, 2, 3, 4]);
638    /// # }));
639    /// # }
640    /// ```
641    pub fn round_robin_bincode<L2: 'a>(
642        self,
643        other: &Cluster<'a, L2>,
644        nondet_membership: NonDet,
645    ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
646    where
647        T: Serialize + DeserializeOwned,
648    {
649        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
650    }
651
652    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
653    /// the configuration in `via` to set up the message transport.
654    ///
655    /// This provides load balancing by evenly distributing work across cluster members. The
656    /// distribution is deterministic based on element order - the first element goes to member 0,
657    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
658    ///
659    /// # Non-Determinism
660    /// The set of cluster members may asynchronously change over time. Each element is distributed
661    /// based on the current cluster membership _at that point in time_. Depending on when cluster
662    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
663    /// membership is stable, the order of members in the round-robin pattern may change across runs.
664    ///
665    /// # Ordering Requirements
666    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
667    /// order of messages and retries affects the round-robin pattern.
668    ///
669    /// # Example
670    /// ```rust
671    /// # #[cfg(feature = "deploy")] {
672    /// # use hydro_lang::prelude::*;
673    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
674    /// # use futures::StreamExt;
675    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
676    /// let p1 = flow.process::<()>();
677    /// let workers: Cluster<()> = flow.cluster::<()>();
678    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
679    /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
680    /// on_worker.send(&p2, TCP.fail_stop().bincode())
681    /// # .first().values() // we use first to assert that each member gets one element
682    /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
683    /// // - MemberId::<()>(?): [1]
684    /// // - MemberId::<()>(?): [2]
685    /// // - MemberId::<()>(?): [3]
686    /// // - MemberId::<()>(?): [4]
687    /// # }, |mut stream| async move {
688    /// # let mut results = Vec::new();
689    /// # for w in 0..4 {
690    /// #     results.push(stream.next().await.unwrap());
691    /// # }
692    /// # results.sort();
693    /// # assert_eq!(results, vec![1, 2, 3, 4]);
694    /// # }));
695    /// # }
696    /// ```
697    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
698        self,
699        to: &Cluster<'a, L2>,
700        via: N,
701        nondet_membership: NonDet,
702    ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
703    where
704        T: Serialize + DeserializeOwned,
705    {
706        let ids = track_membership(self.location.source_cluster_membership_stream(
707            to,
708            nondet!(/** dropped prefixes don't affect broadcast */),
709        ));
710        sliced! {
711            let members_snapshot = use(ids, nondet_membership);
712            let elements = use(self.enumerate(), nondet_membership);
713
714            let current_members = members_snapshot
715                .filter(q!(|b| *b))
716                .keys()
717                .assume_ordering::<TotalOrder>(nondet_membership)
718                .collect_vec();
719
720            elements
721                .cross_singleton(current_members)
722                .filter_map(q!(|(data, members)| {
723                    if members.is_empty() {
724                        None
725                    } else {
726                        Some((members[data.0 % members.len()].clone(), data.1))
727                    }
728                }))
729        }
730        .demux(to, via)
731    }
732}
733
734impl<'a, T, L, B: Boundedness, C: Consistency>
735    Stream<T, Cluster<'a, L, C>, B, TotalOrder, ExactlyOnce>
736{
737    #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
738    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
739    /// [`bincode`] to serialize/deserialize messages.
740    ///
741    /// This provides load balancing by evenly distributing work across cluster members. The
742    /// distribution is deterministic based on element order - the first element goes to member 0,
743    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
744    ///
745    /// # Non-Determinism
746    /// The set of cluster members may asynchronously change over time. Each element is distributed
747    /// based on the current cluster membership _at that point in time_. Depending on when cluster
748    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
749    /// membership is stable, the order of members in the round-robin pattern may change across runs.
750    ///
751    /// # Ordering Requirements
752    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
753    /// order of messages and retries affects the round-robin pattern.
754    ///
755    /// # Example
756    /// ```rust
757    /// # #[cfg(feature = "deploy")] {
758    /// # use hydro_lang::prelude::*;
759    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
760    /// # use hydro_lang::location::MemberId;
761    /// # use futures::StreamExt;
762    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
763    /// let p1 = flow.process::<()>();
764    /// let workers1: Cluster<()> = flow.cluster::<()>();
765    /// let workers2: Cluster<()> = flow.cluster::<()>();
766    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
767    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
768    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
769    /// on_worker2.send_bincode(&p2)
770    /// # .entries()
771    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
772    /// # }, |mut stream| async move {
773    /// # let mut results = Vec::new();
774    /// # let mut locations = std::collections::HashSet::new();
775    /// # for w in 0..=16 {
776    /// #     let (location, v) = stream.next().await.unwrap();
777    /// #     locations.insert(location);
778    /// #     results.push(v);
779    /// # }
780    /// # results.sort();
781    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
782    /// # assert_eq!(locations.len(), 16);
783    /// # }));
784    /// # }
785    /// ```
786    pub fn round_robin_bincode<L2: 'a>(
787        self,
788        other: &Cluster<'a, L2>,
789        nondet_membership: NonDet,
790    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
791    where
792        T: Serialize + DeserializeOwned,
793    {
794        self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
795    }
796
797    /// Distributes elements of this stream to cluster members in a round-robin fashion, using
798    /// the configuration in `via` to set up the message transport.
799    ///
800    /// This provides load balancing by evenly distributing work across cluster members. The
801    /// distribution is deterministic based on element order - the first element goes to member 0,
802    /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
803    ///
804    /// # Non-Determinism
805    /// The set of cluster members may asynchronously change over time. Each element is distributed
806    /// based on the current cluster membership _at that point in time_. Depending on when cluster
807    /// members join and leave, the round-robin pattern will change. Furthermore, even when the
808    /// membership is stable, the order of members in the round-robin pattern may change across runs.
809    ///
810    /// # Ordering Requirements
811    /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
812    /// order of messages and retries affects the round-robin pattern.
813    ///
814    /// # Example
815    /// ```rust
816    /// # #[cfg(feature = "deploy")] {
817    /// # use hydro_lang::prelude::*;
818    /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
819    /// # use hydro_lang::location::MemberId;
820    /// # use futures::StreamExt;
821    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
822    /// let p1 = flow.process::<()>();
823    /// let workers1: Cluster<()> = flow.cluster::<()>();
824    /// let workers2: Cluster<()> = flow.cluster::<()>();
825    /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
826    /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
827    /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
828    /// on_worker2.send(&p2, TCP.fail_stop().bincode())
829    /// # .entries()
830    /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
831    /// # }, |mut stream| async move {
832    /// # let mut results = Vec::new();
833    /// # let mut locations = std::collections::HashSet::new();
834    /// # for w in 0..=16 {
835    /// #     let (location, v) = stream.next().await.unwrap();
836    /// #     locations.insert(location);
837    /// #     results.push(v);
838    /// # }
839    /// # results.sort();
840    /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
841    /// # assert_eq!(locations.len(), 16);
842    /// # }));
843    /// # }
844    /// ```
845    pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
846        self,
847        to: &Cluster<'a, L2>,
848        via: N,
849        nondet_membership: NonDet,
850    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
851    where
852        T: Serialize + DeserializeOwned,
853    {
854        let ids = track_membership(self.location.source_cluster_membership_stream(
855            to,
856            nondet!(/** dropped prefixes don't affect broadcast */),
857        ));
858        sliced! {
859            let members_snapshot = use(ids, nondet_membership);
860            let elements = use(self.enumerate(), nondet_membership);
861
862            let current_members = members_snapshot
863                .filter(q!(|b| *b))
864                .keys()
865                .assume_ordering::<TotalOrder>(nondet_membership)
866                .collect_vec();
867
868            elements
869                .cross_singleton(current_members)
870                .filter_map(q!(|(data, members)| {
871                    if members.is_empty() {
872                        None
873                    } else {
874                        Some((members[data.0 % members.len()].clone(), data.1))
875                    }
876                }))
877        }
878        .demux(to, via)
879    }
880}
881
882impl<'a, T, L, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
883    Stream<T, Cluster<'a, L, C>, B, O, R>
884{
885    #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
886    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
887    /// using [`bincode`] to serialize/deserialize messages.
888    ///
889    /// Each cluster member sends its local stream elements, and they are collected at the destination
890    /// as a [`KeyedStream`] where keys identify the source cluster member.
891    ///
892    /// # Example
893    /// ```rust
894    /// # #[cfg(feature = "deploy")] {
895    /// # use hydro_lang::prelude::*;
896    /// # use futures::StreamExt;
897    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
898    /// let workers: Cluster<()> = flow.cluster::<()>();
899    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
900    /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
901    /// # all_received.entries()
902    /// # }, |mut stream| async move {
903    /// // if there are 4 members in the cluster, we should receive 4 elements
904    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
905    /// # let mut results = Vec::new();
906    /// # for w in 0..4 {
907    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
908    /// # }
909    /// # results.sort();
910    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
911    /// # }));
912    /// # }
913    /// ```
914    ///
915    /// If you don't need to know the source for each element, you can use `.values()`
916    /// to get just the data:
917    /// ```rust
918    /// # #[cfg(feature = "deploy")] {
919    /// # use hydro_lang::prelude::*;
920    /// # use hydro_lang::live_collections::stream::NoOrder;
921    /// # use futures::StreamExt;
922    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
923    /// # let workers: Cluster<()> = flow.cluster::<()>();
924    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
925    /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
926    /// # values
927    /// # }, |mut stream| async move {
928    /// # let mut results = Vec::new();
929    /// # for w in 0..4 {
930    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
931    /// # }
932    /// # results.sort();
933    /// // if there are 4 members in the cluster, we should receive 4 elements
934    /// // 1, 1, 1, 1
935    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
936    /// # }));
937    /// # }
938    /// ```
939    pub fn send_bincode<L2>(
940        self,
941        other: &Process<'a, L2>,
942    ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
943    where
944        T: Serialize + DeserializeOwned,
945    {
946        self.send(other, TCP.fail_stop().bincode())
947    }
948
949    /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
950    /// using the configuration in `via` to set up the message transport.
951    ///
952    /// Each cluster member sends its local stream elements, and they are collected at the destination
953    /// as a [`KeyedStream`] where keys identify the source cluster member.
954    ///
955    /// # Example
956    /// ```rust
957    /// # #[cfg(feature = "deploy")] {
958    /// # use hydro_lang::prelude::*;
959    /// # use futures::StreamExt;
960    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
961    /// let workers: Cluster<()> = flow.cluster::<()>();
962    /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
963    /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
964    /// # all_received.entries()
965    /// # }, |mut stream| async move {
966    /// // if there are 4 members in the cluster, we should receive 4 elements
967    /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
968    /// # let mut results = Vec::new();
969    /// # for w in 0..4 {
970    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
971    /// # }
972    /// # results.sort();
973    /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
974    /// # }));
975    /// # }
976    /// ```
977    ///
978    /// If you don't need to know the source for each element, you can use `.values()`
979    /// to get just the data:
980    /// ```rust
981    /// # #[cfg(feature = "deploy")] {
982    /// # use hydro_lang::prelude::*;
983    /// # use hydro_lang::live_collections::stream::NoOrder;
984    /// # use futures::StreamExt;
985    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
986    /// # let workers: Cluster<()> = flow.cluster::<()>();
987    /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
988    /// let values: Stream<i32, _, _, NoOrder> =
989    ///     numbers.send(&process, TCP.fail_stop().bincode()).values();
990    /// # values
991    /// # }, |mut stream| async move {
992    /// # let mut results = Vec::new();
993    /// # for w in 0..4 {
994    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
995    /// # }
996    /// # results.sort();
997    /// // if there are 4 members in the cluster, we should receive 4 elements
998    /// // 1, 1, 1, 1
999    /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
1000    /// # }));
1001    /// # }
1002    /// ```
1003    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1004    pub fn send<L2, N: NetworkFor<T>>(
1005        self,
1006        to: &Process<'a, L2>,
1007        via: N,
1008    ) -> KeyedStream<
1009        MemberId<L>,
1010        T,
1011        Process<'a, L2>,
1012        Unbounded,
1013        <O as MinOrder<N::OrderingGuarantee>>::Min,
1014        R,
1015    >
1016    where
1017        T: Serialize + DeserializeOwned,
1018        O: MinOrder<N::OrderingGuarantee>,
1019    {
1020        let serialize_pipeline = Some(N::serialize_thunk(false));
1021
1022        let deserialize_pipeline = Some(N::deserialize_thunk(Some(&quote_type::<L>())));
1023
1024        let name = via.name();
1025        if to.multiversioned() && name.is_none() {
1026            panic!(
1027                "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
1028            );
1029        }
1030
1031        let raw_stream: Stream<
1032            (MemberId<L>, T),
1033            Process<'a, L2>,
1034            Unbounded,
1035            <O as MinOrder<N::OrderingGuarantee>>::Min,
1036            R,
1037        > = Stream::new(
1038            to.clone(),
1039            HydroNode::Network {
1040                name: name.map(ToOwned::to_owned),
1041                networking_info: N::networking_info(),
1042                serialize_fn: serialize_pipeline.map(|e| e.into()),
1043                instantiate_fn: DebugInstantiate::Building,
1044                deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1045                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1046                metadata: to.new_node_metadata(Stream::<
1047                    (MemberId<L>, T),
1048                    Process<'a, L2>,
1049                    Unbounded,
1050                    <O as MinOrder<N::OrderingGuarantee>>::Min,
1051                    R,
1052                >::collection_kind()),
1053            },
1054        );
1055
1056        raw_stream.into_keyed()
1057    }
1058
1059    #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
1060    /// Broadcasts elements of this stream at each source member to all members of a destination
1061    /// cluster, using [`bincode`] to serialize/deserialize messages.
1062    ///
1063    /// Each source member sends each of its stream elements to **every** member of the cluster
1064    /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
1065    /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
1066    /// **only data elements** and sends each element to all cluster members.
1067    ///
1068    /// # Non-Determinism
1069    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1070    /// to the current cluster members known _at that point in time_ at the source member. Depending
1071    /// on when each source member is notified of membership changes, it will broadcast each element
1072    /// to different members.
1073    ///
1074    /// # Example
1075    /// ```rust
1076    /// # #[cfg(feature = "deploy")] {
1077    /// # use hydro_lang::prelude::*;
1078    /// # use hydro_lang::location::MemberId;
1079    /// # use futures::StreamExt;
1080    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1081    /// # type Source = ();
1082    /// # type Destination = ();
1083    /// let source: Cluster<Source> = flow.cluster::<Source>();
1084    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1085    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1086    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
1087    /// # on_destination.entries().send_bincode(&p2).entries()
1088    /// // if there are 4 members in the desination, each receives one element from each source member
1089    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1090    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1091    /// // - ...
1092    /// # }, |mut stream| async move {
1093    /// # let mut results = Vec::new();
1094    /// # for w in 0..16 {
1095    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1096    /// # }
1097    /// # results.sort();
1098    /// # assert_eq!(results, vec![
1099    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1100    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1101    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1102    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1103    /// # ]);
1104    /// # }));
1105    /// # }
1106    /// ```
1107    pub fn broadcast_bincode<L2: 'a>(
1108        self,
1109        other: &Cluster<'a, L2>,
1110        nondet_membership: NonDet,
1111    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1112    where
1113        T: Clone + Serialize + DeserializeOwned,
1114    {
1115        self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1116    }
1117
1118    /// Broadcasts elements of this stream at each source member to all members of a destination
1119    /// cluster, using the configuration in `via` to set up the message transport.
1120    ///
1121    /// Each source member sends each of its stream elements to **every** member of the cluster
1122    /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1123    /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1124    /// **only data elements** and sends each element to all cluster members.
1125    ///
1126    /// # Non-Determinism
1127    /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1128    /// to the current cluster members known _at that point in time_ at the source member. Depending
1129    /// on when each source member is notified of membership changes, it will broadcast each element
1130    /// to different members.
1131    ///
1132    /// # Example
1133    /// ```rust
1134    /// # #[cfg(feature = "deploy")] {
1135    /// # use hydro_lang::prelude::*;
1136    /// # use hydro_lang::location::MemberId;
1137    /// # use futures::StreamExt;
1138    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1139    /// # type Source = ();
1140    /// # type Destination = ();
1141    /// let source: Cluster<Source> = flow.cluster::<Source>();
1142    /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1143    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1144    /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1145    /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1146    /// // if there are 4 members in the desination, each receives one element from each source member
1147    /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1148    /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1149    /// // - ...
1150    /// # }, |mut stream| async move {
1151    /// # let mut results = Vec::new();
1152    /// # for w in 0..16 {
1153    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1154    /// # }
1155    /// # results.sort();
1156    /// # assert_eq!(results, vec![
1157    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1158    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1159    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1160    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1161    /// # ]);
1162    /// # }));
1163    /// # }
1164    /// ```
1165    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1166    pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1167        self,
1168        to: &Cluster<'a, L2>,
1169        via: N,
1170        nondet_membership: NonDet,
1171    ) -> KeyedStream<
1172        MemberId<L>,
1173        T,
1174        Cluster<'a, L2>,
1175        Unbounded,
1176        <O as MinOrder<N::OrderingGuarantee>>::Min,
1177        R,
1178    >
1179    where
1180        T: Clone + Serialize + DeserializeOwned,
1181        O: MinOrder<N::OrderingGuarantee>,
1182    {
1183        let ids = track_membership(self.location.source_cluster_membership_stream(
1184            to,
1185            nondet!(/** dropped prefixes don't affect broadcast */),
1186        ));
1187        sliced! {
1188            let members_snapshot = use(ids, nondet_membership);
1189            let elements = use(self, nondet_membership);
1190
1191            let current_members = members_snapshot.filter(q!(|b| *b));
1192            elements.repeat_with_keys(current_members)
1193        }
1194        .demux(to, via)
1195    }
1196
1197    #[cfg(feature = "sim")]
1198    /// Sends elements of this cluster stream to an external location using bincode serialization.
1199    fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1200    where
1201        T: Serialize + DeserializeOwned,
1202    {
1203        let serialize_pipeline = Some(serialize_bincode::<T>(false));
1204
1205        let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1206
1207        let external_port_id = flow_state_borrow.next_external_port();
1208
1209        flow_state_borrow.push_root(HydroRoot::SendExternal {
1210            to_external_key: other.key,
1211            to_port_id: external_port_id,
1212            to_many: false,
1213            unpaired: true,
1214            serialize_fn: serialize_pipeline.map(|e| e.into()),
1215            instantiate_fn: DebugInstantiate::Building,
1216            input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1217            op_metadata: HydroIrOpMetadata::new(),
1218        });
1219
1220        ExternalBincodeStream {
1221            process_key: other.key,
1222            port_id: external_port_id,
1223            _phantom: PhantomData,
1224        }
1225    }
1226
1227    #[cfg(feature = "sim")]
1228    /// Sets up a simulation output port for this cluster stream, allowing test code
1229    /// to receive `(member_id, T)` pairs during simulation.
1230    pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1231    where
1232        T: Serialize + DeserializeOwned,
1233    {
1234        let external_location: External<'a, ()> = External {
1235            key: LocationKey::FIRST,
1236            flow_state: self.location.flow_state().clone(),
1237            _phantom: PhantomData,
1238        };
1239
1240        let external = self.send_bincode_external(&external_location);
1241
1242        crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1243    }
1244}
1245
1246impl<'a, T, L, L2, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
1247    Stream<(MemberId<L2>, T), Cluster<'a, L, C>, B, O, R>
1248{
1249    #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1250    /// Sends elements of this stream at each source member to specific members of a destination
1251    /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1252    ///
1253    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1254    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1255    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1256    /// all members.
1257    ///
1258    /// Each cluster member sends its local stream elements, and they are collected at each
1259    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1260    ///
1261    /// # Example
1262    /// ```rust
1263    /// # #[cfg(feature = "deploy")] {
1264    /// # use hydro_lang::prelude::*;
1265    /// # use futures::StreamExt;
1266    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1267    /// # type Source = ();
1268    /// # type Destination = ();
1269    /// let source: Cluster<Source> = flow.cluster::<Source>();
1270    /// let to_send: Stream<_, Cluster<_>, _> = source
1271    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1272    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1273    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1274    /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1275    /// # all_received.entries().send_bincode(&p2).entries()
1276    /// # }, |mut stream| async move {
1277    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1278    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1279    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1280    /// // - ...
1281    /// # let mut results = Vec::new();
1282    /// # for w in 0..16 {
1283    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1284    /// # }
1285    /// # results.sort();
1286    /// # assert_eq!(results, vec![
1287    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1288    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1289    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1290    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1291    /// # ]);
1292    /// # }));
1293    /// # }
1294    /// ```
1295    pub fn demux_bincode(
1296        self,
1297        other: &Cluster<'a, L2>,
1298    ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1299    where
1300        T: Serialize + DeserializeOwned,
1301    {
1302        self.demux(other, TCP.fail_stop().bincode())
1303    }
1304
1305    /// Sends elements of this stream at each source member to specific members of a destination
1306    /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1307    /// message transport.
1308    ///
1309    /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1310    /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1311    /// this API allows precise targeting of specific cluster members rather than broadcasting to
1312    /// all members.
1313    ///
1314    /// Each cluster member sends its local stream elements, and they are collected at each
1315    /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1316    ///
1317    /// # Example
1318    /// ```rust
1319    /// # #[cfg(feature = "deploy")] {
1320    /// # use hydro_lang::prelude::*;
1321    /// # use futures::StreamExt;
1322    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1323    /// # type Source = ();
1324    /// # type Destination = ();
1325    /// let source: Cluster<Source> = flow.cluster::<Source>();
1326    /// let to_send: Stream<_, Cluster<_>, _> = source
1327    ///     .source_iter(q!(vec![0, 1, 2, 3]))
1328    ///     .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1329    /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1330    /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1331    /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1332    /// # }, |mut stream| async move {
1333    /// // if there are 4 members in the destination cluster, each receives one message from each source member
1334    /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1335    /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1336    /// // - ...
1337    /// # let mut results = Vec::new();
1338    /// # for w in 0..16 {
1339    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
1340    /// # }
1341    /// # results.sort();
1342    /// # assert_eq!(results, vec![
1343    /// #   "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1344    /// #   "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1345    /// #   "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1346    /// #   "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1347    /// # ]);
1348    /// # }));
1349    /// # }
1350    /// ```
1351    #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1352    pub fn demux<N: NetworkFor<T>>(
1353        self,
1354        to: &Cluster<'a, L2>,
1355        via: N,
1356    ) -> KeyedStream<
1357        MemberId<L>,
1358        T,
1359        Cluster<'a, L2, NoConsistency>,
1360        Unbounded,
1361        <O as MinOrder<N::OrderingGuarantee>>::Min,
1362        R,
1363    >
1364    where
1365        T: Serialize + DeserializeOwned,
1366        O: MinOrder<N::OrderingGuarantee>,
1367    {
1368        self.into_keyed().demux(to, via)
1369    }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374    #[cfg(feature = "sim")]
1375    use stageleft::q;
1376
1377    #[cfg(feature = "sim")]
1378    use crate::live_collections::sliced::sliced;
1379    #[cfg(feature = "sim")]
1380    use crate::location::{Location, MemberId};
1381    #[cfg(feature = "sim")]
1382    use crate::networking::TCP;
1383    #[cfg(feature = "sim")]
1384    use crate::nondet::nondet;
1385    #[cfg(feature = "sim")]
1386    use crate::prelude::FlowBuilder;
1387
1388    #[cfg(feature = "sim")]
1389    #[test]
1390    fn sim_send_bincode_o2o() {
1391        use crate::networking::TCP;
1392
1393        let mut flow = FlowBuilder::new();
1394        let node = flow.process::<()>();
1395        let node2 = flow.process::<()>();
1396
1397        let (in_send, input) = node.sim_input();
1398
1399        let out_recv = input
1400            .send(&node2, TCP.fail_stop().bincode())
1401            .batch(&node2.tick(), nondet!(/** test */))
1402            .count()
1403            .all_ticks()
1404            .sim_output();
1405
1406        let instances = flow.sim().exhaustive(async || {
1407            in_send.send(());
1408            in_send.send(());
1409            in_send.send(());
1410
1411            let received = out_recv.collect::<Vec<_>>().await;
1412            assert!(received.into_iter().sum::<usize>() == 3);
1413        });
1414
1415        assert_eq!(instances, 4); // 2^{3 - 1}
1416    }
1417
1418    #[cfg(feature = "sim")]
1419    #[test]
1420    fn sim_send_bincode_m2o() {
1421        let mut flow = FlowBuilder::new();
1422        let cluster = flow.cluster::<()>();
1423        let node = flow.process::<()>();
1424
1425        let input = cluster.source_iter(q!(vec![1]));
1426
1427        let out_recv = input
1428            .send(&node, TCP.fail_stop().bincode())
1429            .entries()
1430            .batch(&node.tick(), nondet!(/** test */))
1431            .all_ticks()
1432            .sim_output();
1433
1434        let instances = flow
1435            .sim()
1436            .with_cluster_size(&cluster, 4)
1437            .exhaustive(async || {
1438                out_recv
1439                    .assert_yields_only_unordered(vec![
1440                        (MemberId::from_raw_id(0), 1),
1441                        (MemberId::from_raw_id(1), 1),
1442                        (MemberId::from_raw_id(2), 1),
1443                        (MemberId::from_raw_id(3), 1),
1444                    ])
1445                    .await
1446            });
1447
1448        assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1449    }
1450
1451    #[cfg(feature = "sim")]
1452    #[test]
1453    fn sim_send_bincode_multiple_m2o() {
1454        let mut flow = FlowBuilder::new();
1455        let cluster1 = flow.cluster::<()>();
1456        let cluster2 = flow.cluster::<()>();
1457        let node = flow.process::<()>();
1458
1459        let out_recv_1 = cluster1
1460            .source_iter(q!(vec![1]))
1461            .send(&node, TCP.fail_stop().bincode())
1462            .entries()
1463            .sim_output();
1464
1465        let out_recv_2 = cluster2
1466            .source_iter(q!(vec![2]))
1467            .send(&node, TCP.fail_stop().bincode())
1468            .entries()
1469            .sim_output();
1470
1471        let instances = flow
1472            .sim()
1473            .with_cluster_size(&cluster1, 3)
1474            .with_cluster_size(&cluster2, 4)
1475            .exhaustive(async || {
1476                out_recv_1
1477                    .assert_yields_only_unordered(vec![
1478                        (MemberId::from_raw_id(0), 1),
1479                        (MemberId::from_raw_id(1), 1),
1480                        (MemberId::from_raw_id(2), 1),
1481                    ])
1482                    .await;
1483
1484                out_recv_2
1485                    .assert_yields_only_unordered(vec![
1486                        (MemberId::from_raw_id(0), 2),
1487                        (MemberId::from_raw_id(1), 2),
1488                        (MemberId::from_raw_id(2), 2),
1489                        (MemberId::from_raw_id(3), 2),
1490                    ])
1491                    .await;
1492            });
1493
1494        assert_eq!(instances, 1);
1495    }
1496
1497    #[cfg(feature = "sim")]
1498    #[test]
1499    fn sim_send_bincode_o2m() {
1500        let mut flow = FlowBuilder::new();
1501        let cluster = flow.cluster::<()>();
1502        let node = flow.process::<()>();
1503
1504        let input = node.source_iter(q!(vec![
1505            (MemberId::from_raw_id(0), 123),
1506            (MemberId::from_raw_id(1), 456),
1507        ]));
1508
1509        let out_recv = input
1510            .demux(&cluster, TCP.fail_stop().bincode())
1511            .map(q!(|x| x + 1))
1512            .send(&node, TCP.fail_stop().bincode())
1513            .entries()
1514            .sim_output();
1515
1516        flow.sim()
1517            .with_cluster_size(&cluster, 4)
1518            .exhaustive(async || {
1519                out_recv
1520                    .assert_yields_only_unordered(vec![
1521                        (MemberId::from_raw_id(0), 124),
1522                        (MemberId::from_raw_id(1), 457),
1523                    ])
1524                    .await
1525            });
1526    }
1527
1528    #[cfg(feature = "sim")]
1529    #[test]
1530    fn sim_broadcast_bincode_o2m() {
1531        let mut flow = FlowBuilder::new();
1532        let cluster = flow.cluster::<()>();
1533        let node = flow.process::<()>();
1534
1535        let input = node.source_iter(q!(vec![123, 456]));
1536
1537        let out_recv = input
1538            .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1539            .map(q!(|x| x + 1))
1540            .send(&node, TCP.fail_stop().bincode())
1541            .entries()
1542            .sim_output();
1543
1544        let mut c_1_produced = false;
1545        let mut c_2_produced = false;
1546        let mut c_1_saw_457_but_not_124 = false;
1547
1548        flow.sim()
1549            .with_cluster_size(&cluster, 2)
1550            .exhaustive(async || {
1551                let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1552
1553                // check that order is preserved
1554                if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1555                    assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1556                    c_1_produced = true;
1557                }
1558
1559                if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1560                    assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1561                    c_2_produced = true;
1562                }
1563
1564                if all_out.contains(&(MemberId::from_raw_id(0), 457))
1565                    && !all_out.contains(&(MemberId::from_raw_id(0), 124))
1566                {
1567                    c_1_saw_457_but_not_124 = true;
1568                }
1569            });
1570
1571        assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1572
1573        // in at least one execution, the cluster member received 457 but not 124, this tests
1574        // that the simulator properly explores dynamic membership additions (a member that joins after 123 is broadcast)
1575        assert!(c_1_saw_457_but_not_124);
1576    }
1577
1578    #[cfg(feature = "sim")]
1579    #[test]
1580    fn sim_send_bincode_m2m() {
1581        let mut flow = FlowBuilder::new();
1582        let cluster = flow.cluster::<()>();
1583        let node = flow.process::<()>();
1584
1585        let input = node.source_iter(q!(vec![
1586            (MemberId::from_raw_id(0), 123),
1587            (MemberId::from_raw_id(1), 456),
1588        ]));
1589
1590        let out_recv = input
1591            .demux(&cluster, TCP.fail_stop().bincode())
1592            .map(q!(|x| x + 1))
1593            .flat_map_ordered(q!(|x| vec![
1594                (MemberId::from_raw_id(0), x),
1595                (MemberId::from_raw_id(1), x),
1596            ]))
1597            .demux(&cluster, TCP.fail_stop().bincode())
1598            .entries()
1599            .send(&node, TCP.fail_stop().bincode())
1600            .entries()
1601            .sim_output();
1602
1603        flow.sim()
1604            .with_cluster_size(&cluster, 4)
1605            .exhaustive(async || {
1606                out_recv
1607                    .assert_yields_only_unordered(vec![
1608                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1609                        (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1610                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1611                        (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1612                    ])
1613                    .await
1614            });
1615    }
1616
1617    #[cfg(feature = "sim")]
1618    #[test]
1619    fn sim_lossy_delayed_forever_o2o() {
1620        use std::collections::HashSet;
1621
1622        use crate::properties::manual_proof;
1623
1624        let mut flow = FlowBuilder::new();
1625        let node = flow.process::<()>();
1626        let node2 = flow.process::<()>();
1627
1628        let received = node
1629            .source_iter(q!(0..3_u32))
1630            .send(&node2, TCP.lossy_delayed_forever().bincode())
1631            .fold(
1632                q!(|| std::collections::HashSet::<u32>::new()),
1633                q!(
1634                    |set, v| {
1635                        set.insert(v);
1636                    },
1637                    commutative = manual_proof!(/** set insert is commutative */)
1638                ),
1639            );
1640
1641        let out_recv = sliced! {
1642            let snapshot = use(received, nondet!(/** test */));
1643            snapshot.into_stream()
1644        }
1645        .sim_output();
1646
1647        let mut saw_non_contiguous = false;
1648
1649        flow.sim().test_safety_only().exhaustive(async || {
1650            let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1651
1652            // Check each individual snapshot for a non-contiguous subset.
1653            for set in &snapshots {
1654                #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1655                if set.len() >= 2 && set.len() < 3 {
1656                    let min = *set.iter().min().unwrap();
1657                    let max = *set.iter().max().unwrap();
1658                    if set.len() < (max - min + 1) as usize {
1659                        saw_non_contiguous = true;
1660                    }
1661                }
1662            }
1663        });
1664
1665        assert!(
1666            saw_non_contiguous,
1667            "Expected at least one execution with a non-contiguous subset of inputs"
1668        );
1669    }
1670
1671    #[cfg(feature = "sim")]
1672    #[test]
1673    fn sim_broadcast_closed_o2m() {
1674        let mut flow = FlowBuilder::new();
1675        let cluster = flow.cluster::<()>();
1676        let node = flow.process::<()>();
1677
1678        let input = node.source_iter(q!(vec![123, 456]));
1679
1680        let out_recv = input
1681            .broadcast_closed(&cluster, TCP.fail_stop().bincode())
1682            .send(&node, TCP.fail_stop().bincode())
1683            .entries()
1684            .sim_output();
1685
1686        flow.sim()
1687            .with_cluster_size(&cluster, 2)
1688            .exhaustive(async || {
1689                out_recv
1690                    .assert_yields_only_unordered(vec![
1691                        (MemberId::from_raw_id(0), 123),
1692                        (MemberId::from_raw_id(0), 456),
1693                        (MemberId::from_raw_id(1), 123),
1694                        (MemberId::from_raw_id(1), 456),
1695                    ])
1696                    .await
1697            });
1698    }
1699}