hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19use crate::location::cluster::{ClusterIds, Consistency, EventualConsistency, NoConsistency};
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::DynLocation;
22use crate::location::external_process::ExternalBincodeStream;
23use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, Process};
24use crate::networking::{NetworkFor, TCP};
25use crate::nondet::{NonDet, nondet};
26use crate::properties::manual_proof;
27#[cfg(feature = "sim")]
28use crate::sim::SimReceiver;
29use crate::staging_util::get_this_crate;
30
31// same as the one in `hydro_std`, but internal use only
32fn track_membership<'a, C, L: Location<'a>>(
33 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
34) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
35 membership.fold(
36 q!(|| false),
37 q!(|present, event| {
38 match event {
39 MembershipEvent::Joined => *present = true,
40 MembershipEvent::Left => *present = false,
41 }
42 }),
43 )
44}
45
46fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
47 let root = get_this_crate();
48
49 if is_demux {
50 parse_quote! {
51 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
52 |(id, data)| {
53 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
54 }
55 )
56 }
57 } else {
58 parse_quote! {
59 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
60 |data| {
61 #root::runtime_support::bincode::serialize(&data).unwrap().into()
62 }
63 )
64 }
65 }
66}
67
68pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
69 serialize_bincode_with_type(is_demux, "e_type::<T>())
70}
71
72fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
73 let root = get_this_crate();
74 if let Some(c_type) = tagged {
75 parse_quote! {
76 |res| {
77 let (id, b) = res.unwrap();
78 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
79 }
80 }
81 } else {
82 parse_quote! {
83 |res| {
84 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
85 }
86 }
87 }
88}
89
90pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
91 deserialize_bincode_with_type(tagged, "e_type::<T>())
92}
93
94impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
95 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
96 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
97 /// using [`bincode`] to serialize/deserialize messages.
98 ///
99 /// The returned stream captures the elements received at the destination, where values will
100 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
101 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
102 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
103 /// dropped no further messages will be sent.
104 ///
105 /// # Example
106 /// ```rust
107 /// # #[cfg(feature = "deploy")] {
108 /// # use hydro_lang::prelude::*;
109 /// # use futures::StreamExt;
110 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
111 /// let p1 = flow.process::<()>();
112 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
113 /// let p2 = flow.process::<()>();
114 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
115 /// // 1, 2, 3
116 /// # on_p2.send_bincode(&p_out)
117 /// # }, |mut stream| async move {
118 /// # for w in 1..=3 {
119 /// # assert_eq!(stream.next().await, Some(w));
120 /// # }
121 /// # }));
122 /// # }
123 /// ```
124 pub fn send_bincode<L2>(
125 self,
126 other: &Process<'a, L2>,
127 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
128 where
129 T: Serialize + DeserializeOwned,
130 {
131 self.send(other, TCP.fail_stop().bincode())
132 }
133
134 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
135 /// using the configuration in `via` to set up the message transport.
136 ///
137 /// The returned stream captures the elements received at the destination, where values will
138 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
139 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
140 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
141 /// dropped no further messages will be sent.
142 ///
143 /// # Example
144 /// ```rust
145 /// # #[cfg(feature = "deploy")] {
146 /// # use hydro_lang::prelude::*;
147 /// # use futures::StreamExt;
148 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
149 /// let p1 = flow.process::<()>();
150 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
151 /// let p2 = flow.process::<()>();
152 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
153 /// // 1, 2, 3
154 /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
155 /// # }, |mut stream| async move {
156 /// # for w in 1..=3 {
157 /// # assert_eq!(stream.next().await, Some(w));
158 /// # }
159 /// # }));
160 /// # }
161 /// ```
162 pub fn send<L2, N: NetworkFor<T>>(
163 self,
164 to: &Process<'a, L2>,
165 via: N,
166 ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
167 where
168 T: Serialize + DeserializeOwned,
169 O: MinOrder<N::OrderingGuarantee>,
170 {
171 let serialize_pipeline = Some(N::serialize_thunk(false));
172 let deserialize_pipeline = Some(N::deserialize_thunk(None));
173
174 let name = via.name();
175 if to.multiversioned() && name.is_none() {
176 panic!(
177 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
178 );
179 }
180
181 Stream::new(
182 to.clone(),
183 HydroNode::Network {
184 name: name.map(ToOwned::to_owned),
185 networking_info: N::networking_info(),
186 serialize_fn: serialize_pipeline.map(|e| e.into()),
187 instantiate_fn: DebugInstantiate::Building,
188 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
189 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
190 metadata: to.new_node_metadata(Stream::<
191 T,
192 Process<'a, L2>,
193 Unbounded,
194 <O as MinOrder<N::OrderingGuarantee>>::Min,
195 R,
196 >::collection_kind()),
197 },
198 )
199 }
200
201 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
202 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
203 /// using [`bincode`] to serialize/deserialize messages.
204 ///
205 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
206 /// membership information. This is a common pattern in distributed systems for broadcasting data to
207 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
208 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
209 /// each element to all cluster members.
210 ///
211 /// # Non-Determinism
212 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
213 /// to the current cluster members _at that point in time_. Depending on when we are notified of
214 /// membership changes, we will broadcast each element to different members.
215 ///
216 /// # Example
217 /// ```rust
218 /// # #[cfg(feature = "deploy")] {
219 /// # use hydro_lang::prelude::*;
220 /// # use futures::StreamExt;
221 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
222 /// let p1 = flow.process::<()>();
223 /// let workers: Cluster<()> = flow.cluster::<()>();
224 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
225 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
226 /// # on_worker.send_bincode(&p2).entries()
227 /// // if there are 4 members in the cluster, each receives one element
228 /// // - MemberId::<()>(0): [123]
229 /// // - MemberId::<()>(1): [123]
230 /// // - MemberId::<()>(2): [123]
231 /// // - MemberId::<()>(3): [123]
232 /// # }, |mut stream| async move {
233 /// # let mut results = Vec::new();
234 /// # for w in 0..4 {
235 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
236 /// # }
237 /// # results.sort();
238 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
239 /// # }));
240 /// # }
241 /// ```
242 pub fn broadcast_bincode<L2: 'a>(
243 self,
244 other: &Cluster<'a, L2>,
245 nondet_membership: NonDet,
246 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
247 where
248 T: Clone + Serialize + DeserializeOwned,
249 {
250 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
251 }
252
253 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
254 /// using the configuration in `via` to set up the message transport.
255 ///
256 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
257 /// membership information. This is a common pattern in distributed systems for broadcasting data to
258 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
259 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
260 /// each element to all cluster members.
261 ///
262 /// # Non-Determinism
263 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
264 /// to the current cluster members _at that point in time_. Depending on when we are notified of
265 /// membership changes, we will broadcast each element to different members.
266 ///
267 /// # Example
268 /// ```rust
269 /// # #[cfg(feature = "deploy")] {
270 /// # use hydro_lang::prelude::*;
271 /// # use futures::StreamExt;
272 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
273 /// let p1 = flow.process::<()>();
274 /// let workers: Cluster<()> = flow.cluster::<()>();
275 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
276 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
277 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
278 /// // if there are 4 members in the cluster, each receives one element
279 /// // - MemberId::<()>(0): [123]
280 /// // - MemberId::<()>(1): [123]
281 /// // - MemberId::<()>(2): [123]
282 /// // - MemberId::<()>(3): [123]
283 /// # }, |mut stream| async move {
284 /// # let mut results = Vec::new();
285 /// # for w in 0..4 {
286 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
287 /// # }
288 /// # results.sort();
289 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
290 /// # }));
291 /// # }
292 /// ```
293 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
294 self,
295 to: &Cluster<'a, L2>,
296 via: N,
297 nondet_membership: NonDet,
298 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
299 where
300 T: Clone + Serialize + DeserializeOwned,
301 O: MinOrder<N::OrderingGuarantee>,
302 {
303 let ids = track_membership(self.location.source_cluster_membership_stream(
304 to,
305 nondet!(/** dropped prefixes don't affect broadcast */),
306 ));
307 sliced! {
308 let members_snapshot = use(ids, nondet_membership);
309 let elements = use(self, nondet_membership);
310
311 let current_members = members_snapshot.filter(q!(|b| *b));
312 elements.repeat_with_keys(current_members)
313 }
314 .demux(to, via)
315 }
316
317 #[expect(clippy::type_complexity, reason = "guarantees eventual consistency")]
318 /// Broadcasts elements of this stream to all members of a cluster,
319 /// assuming membership is closed (fixed at deploy time).
320 ///
321 /// Unlike [`Stream::broadcast`], this does not require a [`NonDet`] guard.
322 /// The membership set is obtained from deploy metadata via
323 /// [`ClusterIds`], producing a
324 /// `Bounded` stream. The cross-product of data × members is fully
325 /// deterministic.
326 ///
327 /// This is only available in deployment targets with static cluster
328 /// membership (legacy Hydro Deploy and simulation). There are no late
329 /// joiners in that context, so broadcast receivers are guaranteed to
330 /// get data from the start of the stream. On dynamic targets
331 /// (e.g. ECS), use [`Stream::broadcast`] instead.
332 ///
333 /// # Example
334 /// ```rust
335 /// # #[cfg(feature = "deploy")] {
336 /// # use hydro_lang::prelude::*;
337 /// # use futures::StreamExt;
338 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
339 /// let p1 = flow.process::<()>();
340 /// let workers: Cluster<()> = flow.cluster::<()>();
341 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
342 /// let on_worker = numbers.broadcast_closed(&workers, TCP.fail_stop().bincode());
343 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
344 /// // each of the 4 cluster members receives 123
345 /// # }, |mut stream| async move {
346 /// # let mut results = Vec::new();
347 /// # for _ in 0..4 {
348 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
349 /// # }
350 /// # results.sort();
351 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
352 /// # }));
353 /// # }
354 /// ```
355 pub fn broadcast_closed<L2: 'a, N: NetworkFor<T>>(
356 self,
357 to: &Cluster<'a, L2>,
358 via: N,
359 ) -> Stream<
360 T,
361 Cluster<'a, L2, EventualConsistency>,
362 Unbounded,
363 <O as MinOrder<N::OrderingGuarantee>>::Min,
364 R,
365 >
366 where
367 T: Clone + Serialize + DeserializeOwned,
368 O: MinOrder<N::OrderingGuarantee>,
369 {
370 let cluster_ids = ClusterIds {
371 key: to.key,
372 _phantom: PhantomData,
373 };
374 let member_ids = self.location.source_iter(q!(cluster_ids
375 .iter()
376 .map(|id| MemberId::from_tagless(id.clone()))));
377
378 // Late joiners will receive no data from this broadcast, which is
379 // future-monotone and eventually consistent (a safe under-approximation).
380 self.cross_product(member_ids)
381 .map(q!(|(data, member_id)| (member_id, data)))
382 .into_keyed()
383 .demux(to, via)
384 .assert_has_consistency_of_trusted(manual_proof!(/** closed broadcast will materialze the same elements on each member */))
385 }
386
387 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
388 /// serialization. The external process can receive these elements by establishing a TCP
389 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
390 ///
391 /// # Example
392 /// ```rust
393 /// # #[cfg(feature = "deploy")] {
394 /// # use hydro_lang::prelude::*;
395 /// # use futures::StreamExt;
396 /// # tokio_test::block_on(async move {
397 /// let mut flow = FlowBuilder::new();
398 /// let process = flow.process::<()>();
399 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
400 /// let external = flow.external::<()>();
401 /// let external_handle = numbers.send_bincode_external(&external);
402 ///
403 /// let mut deployment = hydro_deploy::Deployment::new();
404 /// let nodes = flow
405 /// .with_process(&process, deployment.Localhost())
406 /// .with_external(&external, deployment.Localhost())
407 /// .deploy(&mut deployment);
408 ///
409 /// deployment.deploy().await.unwrap();
410 /// // establish the TCP connection
411 /// let mut external_recv_stream = nodes.connect(external_handle).await;
412 /// deployment.start().await.unwrap();
413 ///
414 /// for w in 1..=3 {
415 /// assert_eq!(external_recv_stream.next().await, Some(w));
416 /// }
417 /// # });
418 /// # }
419 /// ```
420 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
421 where
422 T: Serialize + DeserializeOwned,
423 {
424 let serialize_pipeline = Some(serialize_bincode::<T>(false));
425
426 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
427
428 let external_port_id = flow_state_borrow.next_external_port();
429
430 flow_state_borrow.push_root(HydroRoot::SendExternal {
431 to_external_key: other.key,
432 to_port_id: external_port_id,
433 to_many: false,
434 unpaired: true,
435 serialize_fn: serialize_pipeline.map(|e| e.into()),
436 instantiate_fn: DebugInstantiate::Building,
437 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
438 op_metadata: HydroIrOpMetadata::new(),
439 });
440
441 ExternalBincodeStream {
442 process_key: other.key,
443 port_id: external_port_id,
444 _phantom: PhantomData,
445 }
446 }
447
448 #[cfg(feature = "sim")]
449 /// Sets up a simulation output port for this stream, allowing test code to receive elements
450 /// sent to this stream during simulation.
451 pub fn sim_output(self) -> SimReceiver<T, O, R>
452 where
453 T: Serialize + DeserializeOwned,
454 {
455 let external_location: External<'a, ()> = External {
456 key: LocationKey::FIRST,
457 flow_state: self.location.flow_state().clone(),
458 _phantom: PhantomData,
459 };
460
461 let external = self.send_bincode_external(&external_location);
462
463 SimReceiver(external.port_id, PhantomData)
464 }
465}
466
467impl<'a, T, L: Location<'a>, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
468 /// Creates an external output for embedded deployment mode.
469 ///
470 /// The `name` parameter specifies the name of the field in the generated
471 /// `EmbeddedOutputs` struct that will receive elements from this stream.
472 /// The generated function will accept an `EmbeddedOutputs` struct with an
473 /// `impl FnMut(T)` field with this name.
474 pub fn embedded_output(self, name: impl Into<String>) {
475 let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
476
477 self.location
478 .flow_state()
479 .borrow_mut()
480 .push_root(HydroRoot::EmbeddedOutput {
481 ident,
482 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
483 op_metadata: HydroIrOpMetadata::new(),
484 });
485 }
486}
487
488impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
489 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
490{
491 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
492 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
493 /// using [`bincode`] to serialize/deserialize messages.
494 ///
495 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
496 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
497 /// this API allows precise targeting of specific cluster members rather than broadcasting to
498 /// all members.
499 ///
500 /// # Example
501 /// ```rust
502 /// # #[cfg(feature = "deploy")] {
503 /// # use hydro_lang::prelude::*;
504 /// # use futures::StreamExt;
505 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
506 /// let p1 = flow.process::<()>();
507 /// let workers: Cluster<()> = flow.cluster::<()>();
508 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
509 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
510 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
511 /// .demux_bincode(&workers);
512 /// # on_worker.send_bincode(&p2).entries()
513 /// // if there are 4 members in the cluster, each receives one element
514 /// // - MemberId::<()>(0): [0]
515 /// // - MemberId::<()>(1): [1]
516 /// // - MemberId::<()>(2): [2]
517 /// // - MemberId::<()>(3): [3]
518 /// # }, |mut stream| async move {
519 /// # let mut results = Vec::new();
520 /// # for w in 0..4 {
521 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
522 /// # }
523 /// # results.sort();
524 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
525 /// # }));
526 /// # }
527 /// ```
528 pub fn demux_bincode(
529 self,
530 other: &Cluster<'a, L2>,
531 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
532 where
533 T: Serialize + DeserializeOwned,
534 {
535 self.demux(other, TCP.fail_stop().bincode())
536 }
537
538 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
539 /// using the configuration in `via` to set up the message transport.
540 ///
541 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
542 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
543 /// this API allows precise targeting of specific cluster members rather than broadcasting to
544 /// all members.
545 ///
546 /// # Example
547 /// ```rust
548 /// # #[cfg(feature = "deploy")] {
549 /// # use hydro_lang::prelude::*;
550 /// # use futures::StreamExt;
551 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
552 /// let p1 = flow.process::<()>();
553 /// let workers: Cluster<()> = flow.cluster::<()>();
554 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
555 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
556 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
557 /// .demux(&workers, TCP.fail_stop().bincode());
558 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
559 /// // if there are 4 members in the cluster, each receives one element
560 /// // - MemberId::<()>(0): [0]
561 /// // - MemberId::<()>(1): [1]
562 /// // - MemberId::<()>(2): [2]
563 /// // - MemberId::<()>(3): [3]
564 /// # }, |mut stream| async move {
565 /// # let mut results = Vec::new();
566 /// # for w in 0..4 {
567 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
568 /// # }
569 /// # results.sort();
570 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
571 /// # }));
572 /// # }
573 /// ```
574 #[expect(clippy::type_complexity, reason = "DropConsistency type")]
575 pub fn demux<N: NetworkFor<T>>(
576 self,
577 to: &Cluster<'a, L2>,
578 via: N,
579 ) -> Stream<
580 T,
581 Cluster<'a, L2, NoConsistency>,
582 Unbounded,
583 <O as MinOrder<N::OrderingGuarantee>>::Min,
584 R,
585 >
586 where
587 T: Serialize + DeserializeOwned,
588 O: MinOrder<N::OrderingGuarantee>,
589 {
590 self.into_keyed().demux(to, via)
591 }
592}
593
594impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
595 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
596 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
597 /// [`bincode`] to serialize/deserialize messages.
598 ///
599 /// This provides load balancing by evenly distributing work across cluster members. The
600 /// distribution is deterministic based on element order - the first element goes to member 0,
601 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
602 ///
603 /// # Non-Determinism
604 /// The set of cluster members may asynchronously change over time. Each element is distributed
605 /// based on the current cluster membership _at that point in time_. Depending on when cluster
606 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
607 /// membership is stable, the order of members in the round-robin pattern may change across runs.
608 ///
609 /// # Ordering Requirements
610 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
611 /// order of messages and retries affects the round-robin pattern.
612 ///
613 /// # Example
614 /// ```rust
615 /// # #[cfg(feature = "deploy")] {
616 /// # use hydro_lang::prelude::*;
617 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
618 /// # use futures::StreamExt;
619 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
620 /// let p1 = flow.process::<()>();
621 /// let workers: Cluster<()> = flow.cluster::<()>();
622 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
623 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
624 /// on_worker.send_bincode(&p2)
625 /// # .first().values() // we use first to assert that each member gets one element
626 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
627 /// // - MemberId::<()>(?): [1]
628 /// // - MemberId::<()>(?): [2]
629 /// // - MemberId::<()>(?): [3]
630 /// // - MemberId::<()>(?): [4]
631 /// # }, |mut stream| async move {
632 /// # let mut results = Vec::new();
633 /// # for w in 0..4 {
634 /// # results.push(stream.next().await.unwrap());
635 /// # }
636 /// # results.sort();
637 /// # assert_eq!(results, vec![1, 2, 3, 4]);
638 /// # }));
639 /// # }
640 /// ```
641 pub fn round_robin_bincode<L2: 'a>(
642 self,
643 other: &Cluster<'a, L2>,
644 nondet_membership: NonDet,
645 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
646 where
647 T: Serialize + DeserializeOwned,
648 {
649 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
650 }
651
652 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
653 /// the configuration in `via` to set up the message transport.
654 ///
655 /// This provides load balancing by evenly distributing work across cluster members. The
656 /// distribution is deterministic based on element order - the first element goes to member 0,
657 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
658 ///
659 /// # Non-Determinism
660 /// The set of cluster members may asynchronously change over time. Each element is distributed
661 /// based on the current cluster membership _at that point in time_. Depending on when cluster
662 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
663 /// membership is stable, the order of members in the round-robin pattern may change across runs.
664 ///
665 /// # Ordering Requirements
666 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
667 /// order of messages and retries affects the round-robin pattern.
668 ///
669 /// # Example
670 /// ```rust
671 /// # #[cfg(feature = "deploy")] {
672 /// # use hydro_lang::prelude::*;
673 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
674 /// # use futures::StreamExt;
675 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
676 /// let p1 = flow.process::<()>();
677 /// let workers: Cluster<()> = flow.cluster::<()>();
678 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
679 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
680 /// on_worker.send(&p2, TCP.fail_stop().bincode())
681 /// # .first().values() // we use first to assert that each member gets one element
682 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
683 /// // - MemberId::<()>(?): [1]
684 /// // - MemberId::<()>(?): [2]
685 /// // - MemberId::<()>(?): [3]
686 /// // - MemberId::<()>(?): [4]
687 /// # }, |mut stream| async move {
688 /// # let mut results = Vec::new();
689 /// # for w in 0..4 {
690 /// # results.push(stream.next().await.unwrap());
691 /// # }
692 /// # results.sort();
693 /// # assert_eq!(results, vec![1, 2, 3, 4]);
694 /// # }));
695 /// # }
696 /// ```
697 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
698 self,
699 to: &Cluster<'a, L2>,
700 via: N,
701 nondet_membership: NonDet,
702 ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
703 where
704 T: Serialize + DeserializeOwned,
705 {
706 let ids = track_membership(self.location.source_cluster_membership_stream(
707 to,
708 nondet!(/** dropped prefixes don't affect broadcast */),
709 ));
710 sliced! {
711 let members_snapshot = use(ids, nondet_membership);
712 let elements = use(self.enumerate(), nondet_membership);
713
714 let current_members = members_snapshot
715 .filter(q!(|b| *b))
716 .keys()
717 .assume_ordering::<TotalOrder>(nondet_membership)
718 .collect_vec();
719
720 elements
721 .cross_singleton(current_members)
722 .filter_map(q!(|(data, members)| {
723 if members.is_empty() {
724 None
725 } else {
726 Some((members[data.0 % members.len()].clone(), data.1))
727 }
728 }))
729 }
730 .demux(to, via)
731 }
732}
733
734impl<'a, T, L, B: Boundedness, C: Consistency>
735 Stream<T, Cluster<'a, L, C>, B, TotalOrder, ExactlyOnce>
736{
737 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
738 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
739 /// [`bincode`] to serialize/deserialize messages.
740 ///
741 /// This provides load balancing by evenly distributing work across cluster members. The
742 /// distribution is deterministic based on element order - the first element goes to member 0,
743 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
744 ///
745 /// # Non-Determinism
746 /// The set of cluster members may asynchronously change over time. Each element is distributed
747 /// based on the current cluster membership _at that point in time_. Depending on when cluster
748 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
749 /// membership is stable, the order of members in the round-robin pattern may change across runs.
750 ///
751 /// # Ordering Requirements
752 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
753 /// order of messages and retries affects the round-robin pattern.
754 ///
755 /// # Example
756 /// ```rust
757 /// # #[cfg(feature = "deploy")] {
758 /// # use hydro_lang::prelude::*;
759 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
760 /// # use hydro_lang::location::MemberId;
761 /// # use futures::StreamExt;
762 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
763 /// let p1 = flow.process::<()>();
764 /// let workers1: Cluster<()> = flow.cluster::<()>();
765 /// let workers2: Cluster<()> = flow.cluster::<()>();
766 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
767 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
768 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
769 /// on_worker2.send_bincode(&p2)
770 /// # .entries()
771 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
772 /// # }, |mut stream| async move {
773 /// # let mut results = Vec::new();
774 /// # let mut locations = std::collections::HashSet::new();
775 /// # for w in 0..=16 {
776 /// # let (location, v) = stream.next().await.unwrap();
777 /// # locations.insert(location);
778 /// # results.push(v);
779 /// # }
780 /// # results.sort();
781 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
782 /// # assert_eq!(locations.len(), 16);
783 /// # }));
784 /// # }
785 /// ```
786 pub fn round_robin_bincode<L2: 'a>(
787 self,
788 other: &Cluster<'a, L2>,
789 nondet_membership: NonDet,
790 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
791 where
792 T: Serialize + DeserializeOwned,
793 {
794 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
795 }
796
797 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
798 /// the configuration in `via` to set up the message transport.
799 ///
800 /// This provides load balancing by evenly distributing work across cluster members. The
801 /// distribution is deterministic based on element order - the first element goes to member 0,
802 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
803 ///
804 /// # Non-Determinism
805 /// The set of cluster members may asynchronously change over time. Each element is distributed
806 /// based on the current cluster membership _at that point in time_. Depending on when cluster
807 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
808 /// membership is stable, the order of members in the round-robin pattern may change across runs.
809 ///
810 /// # Ordering Requirements
811 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
812 /// order of messages and retries affects the round-robin pattern.
813 ///
814 /// # Example
815 /// ```rust
816 /// # #[cfg(feature = "deploy")] {
817 /// # use hydro_lang::prelude::*;
818 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
819 /// # use hydro_lang::location::MemberId;
820 /// # use futures::StreamExt;
821 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
822 /// let p1 = flow.process::<()>();
823 /// let workers1: Cluster<()> = flow.cluster::<()>();
824 /// let workers2: Cluster<()> = flow.cluster::<()>();
825 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
826 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
827 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
828 /// on_worker2.send(&p2, TCP.fail_stop().bincode())
829 /// # .entries()
830 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
831 /// # }, |mut stream| async move {
832 /// # let mut results = Vec::new();
833 /// # let mut locations = std::collections::HashSet::new();
834 /// # for w in 0..=16 {
835 /// # let (location, v) = stream.next().await.unwrap();
836 /// # locations.insert(location);
837 /// # results.push(v);
838 /// # }
839 /// # results.sort();
840 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
841 /// # assert_eq!(locations.len(), 16);
842 /// # }));
843 /// # }
844 /// ```
845 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
846 self,
847 to: &Cluster<'a, L2>,
848 via: N,
849 nondet_membership: NonDet,
850 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
851 where
852 T: Serialize + DeserializeOwned,
853 {
854 let ids = track_membership(self.location.source_cluster_membership_stream(
855 to,
856 nondet!(/** dropped prefixes don't affect broadcast */),
857 ));
858 sliced! {
859 let members_snapshot = use(ids, nondet_membership);
860 let elements = use(self.enumerate(), nondet_membership);
861
862 let current_members = members_snapshot
863 .filter(q!(|b| *b))
864 .keys()
865 .assume_ordering::<TotalOrder>(nondet_membership)
866 .collect_vec();
867
868 elements
869 .cross_singleton(current_members)
870 .filter_map(q!(|(data, members)| {
871 if members.is_empty() {
872 None
873 } else {
874 Some((members[data.0 % members.len()].clone(), data.1))
875 }
876 }))
877 }
878 .demux(to, via)
879 }
880}
881
882impl<'a, T, L, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
883 Stream<T, Cluster<'a, L, C>, B, O, R>
884{
885 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
886 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
887 /// using [`bincode`] to serialize/deserialize messages.
888 ///
889 /// Each cluster member sends its local stream elements, and they are collected at the destination
890 /// as a [`KeyedStream`] where keys identify the source cluster member.
891 ///
892 /// # Example
893 /// ```rust
894 /// # #[cfg(feature = "deploy")] {
895 /// # use hydro_lang::prelude::*;
896 /// # use futures::StreamExt;
897 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
898 /// let workers: Cluster<()> = flow.cluster::<()>();
899 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
900 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
901 /// # all_received.entries()
902 /// # }, |mut stream| async move {
903 /// // if there are 4 members in the cluster, we should receive 4 elements
904 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
905 /// # let mut results = Vec::new();
906 /// # for w in 0..4 {
907 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
908 /// # }
909 /// # results.sort();
910 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
911 /// # }));
912 /// # }
913 /// ```
914 ///
915 /// If you don't need to know the source for each element, you can use `.values()`
916 /// to get just the data:
917 /// ```rust
918 /// # #[cfg(feature = "deploy")] {
919 /// # use hydro_lang::prelude::*;
920 /// # use hydro_lang::live_collections::stream::NoOrder;
921 /// # use futures::StreamExt;
922 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
923 /// # let workers: Cluster<()> = flow.cluster::<()>();
924 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
925 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
926 /// # values
927 /// # }, |mut stream| async move {
928 /// # let mut results = Vec::new();
929 /// # for w in 0..4 {
930 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
931 /// # }
932 /// # results.sort();
933 /// // if there are 4 members in the cluster, we should receive 4 elements
934 /// // 1, 1, 1, 1
935 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
936 /// # }));
937 /// # }
938 /// ```
939 pub fn send_bincode<L2>(
940 self,
941 other: &Process<'a, L2>,
942 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
943 where
944 T: Serialize + DeserializeOwned,
945 {
946 self.send(other, TCP.fail_stop().bincode())
947 }
948
949 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
950 /// using the configuration in `via` to set up the message transport.
951 ///
952 /// Each cluster member sends its local stream elements, and they are collected at the destination
953 /// as a [`KeyedStream`] where keys identify the source cluster member.
954 ///
955 /// # Example
956 /// ```rust
957 /// # #[cfg(feature = "deploy")] {
958 /// # use hydro_lang::prelude::*;
959 /// # use futures::StreamExt;
960 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
961 /// let workers: Cluster<()> = flow.cluster::<()>();
962 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
963 /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
964 /// # all_received.entries()
965 /// # }, |mut stream| async move {
966 /// // if there are 4 members in the cluster, we should receive 4 elements
967 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
968 /// # let mut results = Vec::new();
969 /// # for w in 0..4 {
970 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
971 /// # }
972 /// # results.sort();
973 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
974 /// # }));
975 /// # }
976 /// ```
977 ///
978 /// If you don't need to know the source for each element, you can use `.values()`
979 /// to get just the data:
980 /// ```rust
981 /// # #[cfg(feature = "deploy")] {
982 /// # use hydro_lang::prelude::*;
983 /// # use hydro_lang::live_collections::stream::NoOrder;
984 /// # use futures::StreamExt;
985 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
986 /// # let workers: Cluster<()> = flow.cluster::<()>();
987 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
988 /// let values: Stream<i32, _, _, NoOrder> =
989 /// numbers.send(&process, TCP.fail_stop().bincode()).values();
990 /// # values
991 /// # }, |mut stream| async move {
992 /// # let mut results = Vec::new();
993 /// # for w in 0..4 {
994 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
995 /// # }
996 /// # results.sort();
997 /// // if there are 4 members in the cluster, we should receive 4 elements
998 /// // 1, 1, 1, 1
999 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
1000 /// # }));
1001 /// # }
1002 /// ```
1003 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1004 pub fn send<L2, N: NetworkFor<T>>(
1005 self,
1006 to: &Process<'a, L2>,
1007 via: N,
1008 ) -> KeyedStream<
1009 MemberId<L>,
1010 T,
1011 Process<'a, L2>,
1012 Unbounded,
1013 <O as MinOrder<N::OrderingGuarantee>>::Min,
1014 R,
1015 >
1016 where
1017 T: Serialize + DeserializeOwned,
1018 O: MinOrder<N::OrderingGuarantee>,
1019 {
1020 let serialize_pipeline = Some(N::serialize_thunk(false));
1021
1022 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
1023
1024 let name = via.name();
1025 if to.multiversioned() && name.is_none() {
1026 panic!(
1027 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
1028 );
1029 }
1030
1031 let raw_stream: Stream<
1032 (MemberId<L>, T),
1033 Process<'a, L2>,
1034 Unbounded,
1035 <O as MinOrder<N::OrderingGuarantee>>::Min,
1036 R,
1037 > = Stream::new(
1038 to.clone(),
1039 HydroNode::Network {
1040 name: name.map(ToOwned::to_owned),
1041 networking_info: N::networking_info(),
1042 serialize_fn: serialize_pipeline.map(|e| e.into()),
1043 instantiate_fn: DebugInstantiate::Building,
1044 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1045 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1046 metadata: to.new_node_metadata(Stream::<
1047 (MemberId<L>, T),
1048 Process<'a, L2>,
1049 Unbounded,
1050 <O as MinOrder<N::OrderingGuarantee>>::Min,
1051 R,
1052 >::collection_kind()),
1053 },
1054 );
1055
1056 raw_stream.into_keyed()
1057 }
1058
1059 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
1060 /// Broadcasts elements of this stream at each source member to all members of a destination
1061 /// cluster, using [`bincode`] to serialize/deserialize messages.
1062 ///
1063 /// Each source member sends each of its stream elements to **every** member of the cluster
1064 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
1065 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
1066 /// **only data elements** and sends each element to all cluster members.
1067 ///
1068 /// # Non-Determinism
1069 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1070 /// to the current cluster members known _at that point in time_ at the source member. Depending
1071 /// on when each source member is notified of membership changes, it will broadcast each element
1072 /// to different members.
1073 ///
1074 /// # Example
1075 /// ```rust
1076 /// # #[cfg(feature = "deploy")] {
1077 /// # use hydro_lang::prelude::*;
1078 /// # use hydro_lang::location::MemberId;
1079 /// # use futures::StreamExt;
1080 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1081 /// # type Source = ();
1082 /// # type Destination = ();
1083 /// let source: Cluster<Source> = flow.cluster::<Source>();
1084 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1085 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1086 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
1087 /// # on_destination.entries().send_bincode(&p2).entries()
1088 /// // if there are 4 members in the desination, each receives one element from each source member
1089 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1090 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1091 /// // - ...
1092 /// # }, |mut stream| async move {
1093 /// # let mut results = Vec::new();
1094 /// # for w in 0..16 {
1095 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1096 /// # }
1097 /// # results.sort();
1098 /// # assert_eq!(results, vec![
1099 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1100 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1101 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1102 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1103 /// # ]);
1104 /// # }));
1105 /// # }
1106 /// ```
1107 pub fn broadcast_bincode<L2: 'a>(
1108 self,
1109 other: &Cluster<'a, L2>,
1110 nondet_membership: NonDet,
1111 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1112 where
1113 T: Clone + Serialize + DeserializeOwned,
1114 {
1115 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1116 }
1117
1118 /// Broadcasts elements of this stream at each source member to all members of a destination
1119 /// cluster, using the configuration in `via` to set up the message transport.
1120 ///
1121 /// Each source member sends each of its stream elements to **every** member of the cluster
1122 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1123 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1124 /// **only data elements** and sends each element to all cluster members.
1125 ///
1126 /// # Non-Determinism
1127 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1128 /// to the current cluster members known _at that point in time_ at the source member. Depending
1129 /// on when each source member is notified of membership changes, it will broadcast each element
1130 /// to different members.
1131 ///
1132 /// # Example
1133 /// ```rust
1134 /// # #[cfg(feature = "deploy")] {
1135 /// # use hydro_lang::prelude::*;
1136 /// # use hydro_lang::location::MemberId;
1137 /// # use futures::StreamExt;
1138 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1139 /// # type Source = ();
1140 /// # type Destination = ();
1141 /// let source: Cluster<Source> = flow.cluster::<Source>();
1142 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1143 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1144 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1145 /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1146 /// // if there are 4 members in the desination, each receives one element from each source member
1147 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1148 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1149 /// // - ...
1150 /// # }, |mut stream| async move {
1151 /// # let mut results = Vec::new();
1152 /// # for w in 0..16 {
1153 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1154 /// # }
1155 /// # results.sort();
1156 /// # assert_eq!(results, vec![
1157 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1158 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1159 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1160 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1161 /// # ]);
1162 /// # }));
1163 /// # }
1164 /// ```
1165 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1166 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1167 self,
1168 to: &Cluster<'a, L2>,
1169 via: N,
1170 nondet_membership: NonDet,
1171 ) -> KeyedStream<
1172 MemberId<L>,
1173 T,
1174 Cluster<'a, L2>,
1175 Unbounded,
1176 <O as MinOrder<N::OrderingGuarantee>>::Min,
1177 R,
1178 >
1179 where
1180 T: Clone + Serialize + DeserializeOwned,
1181 O: MinOrder<N::OrderingGuarantee>,
1182 {
1183 let ids = track_membership(self.location.source_cluster_membership_stream(
1184 to,
1185 nondet!(/** dropped prefixes don't affect broadcast */),
1186 ));
1187 sliced! {
1188 let members_snapshot = use(ids, nondet_membership);
1189 let elements = use(self, nondet_membership);
1190
1191 let current_members = members_snapshot.filter(q!(|b| *b));
1192 elements.repeat_with_keys(current_members)
1193 }
1194 .demux(to, via)
1195 }
1196
1197 /// Broadcasts elements of this stream at each source member to all members of a destination
1198 /// cluster, assuming membership is closed (fixed at deploy time).
1199 ///
1200 /// Unlike [`Stream::broadcast`], this does not require a [`NonDet`] guard.
1201 /// The membership set is obtained from deploy metadata via [`ClusterIds`], making the
1202 /// broadcast fully deterministic. Since all source members send to all destination members
1203 /// and membership is fixed, every destination member receives the same set of elements
1204 /// from each source, guaranteeing [`EventualConsistency`].
1205 ///
1206 /// This is only available in deployment targets with static cluster membership
1207 /// (legacy Hydro Deploy and simulation). On dynamic targets, use [`Stream::broadcast`].
1208 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1209 pub fn broadcast_closed<L2: 'a, N: NetworkFor<T>>(
1210 self,
1211 to: &Cluster<'a, L2>,
1212 via: N,
1213 ) -> KeyedStream<
1214 MemberId<L>,
1215 T,
1216 Cluster<'a, L2, EventualConsistency>,
1217 Unbounded,
1218 <O as MinOrder<N::OrderingGuarantee>>::Min,
1219 R,
1220 >
1221 where
1222 T: Clone + Serialize + DeserializeOwned,
1223 O: MinOrder<N::OrderingGuarantee>,
1224 {
1225 let cluster_ids = ClusterIds {
1226 key: to.key,
1227 _phantom: PhantomData,
1228 };
1229 let member_ids = self
1230 .location
1231 .source_iter(q!(cluster_ids
1232 .iter()
1233 .map(|id| MemberId::from_tagless(id.clone()))))
1234 .assert_has_consistency_of_trusted::<Cluster<'a, L, C>>(manual_proof!(
1235 /// ClusterIds is deploy-time metadata, identical on every cluster member.
1236 ));
1237
1238 self.cross_product(member_ids)
1239 .map(q!(|(data, member_id)| (member_id, data)))
1240 .into_keyed()
1241 .demux(to, via)
1242 .assert_has_consistency_of_trusted(manual_proof!(
1243 /// Closed broadcast with fixed membership: every source member sends to every
1244 /// destination member, so all destinations materialize the same elements.
1245 ))
1246 }
1247
1248 #[cfg(feature = "sim")]
1249 /// Sends elements of this cluster stream to an external location using bincode serialization.
1250 fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1251 where
1252 T: Serialize + DeserializeOwned,
1253 {
1254 let serialize_pipeline = Some(serialize_bincode::<T>(false));
1255
1256 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1257
1258 let external_port_id = flow_state_borrow.next_external_port();
1259
1260 flow_state_borrow.push_root(HydroRoot::SendExternal {
1261 to_external_key: other.key,
1262 to_port_id: external_port_id,
1263 to_many: false,
1264 unpaired: true,
1265 serialize_fn: serialize_pipeline.map(|e| e.into()),
1266 instantiate_fn: DebugInstantiate::Building,
1267 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1268 op_metadata: HydroIrOpMetadata::new(),
1269 });
1270
1271 ExternalBincodeStream {
1272 process_key: other.key,
1273 port_id: external_port_id,
1274 _phantom: PhantomData,
1275 }
1276 }
1277
1278 #[cfg(feature = "sim")]
1279 /// Sets up a simulation output port for this cluster stream, allowing test code
1280 /// to receive `(member_id, T)` pairs during simulation.
1281 pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1282 where
1283 T: Serialize + DeserializeOwned,
1284 {
1285 let external_location: External<'a, ()> = External {
1286 key: LocationKey::FIRST,
1287 flow_state: self.location.flow_state().clone(),
1288 _phantom: PhantomData,
1289 };
1290
1291 let external = self.send_bincode_external(&external_location);
1292
1293 crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1294 }
1295}
1296
1297impl<'a, T, L, L2, B: Boundedness, C: Consistency, O: Ordering, R: Retries>
1298 Stream<(MemberId<L2>, T), Cluster<'a, L, C>, B, O, R>
1299{
1300 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1301 /// Sends elements of this stream at each source member to specific members of a destination
1302 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1303 ///
1304 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1305 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1306 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1307 /// all members.
1308 ///
1309 /// Each cluster member sends its local stream elements, and they are collected at each
1310 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1311 ///
1312 /// # Example
1313 /// ```rust
1314 /// # #[cfg(feature = "deploy")] {
1315 /// # use hydro_lang::prelude::*;
1316 /// # use futures::StreamExt;
1317 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1318 /// # type Source = ();
1319 /// # type Destination = ();
1320 /// let source: Cluster<Source> = flow.cluster::<Source>();
1321 /// let to_send: Stream<_, Cluster<_>, _> = source
1322 /// .source_iter(q!(vec![0, 1, 2, 3]))
1323 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1324 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1325 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1326 /// # all_received.entries().send_bincode(&p2).entries()
1327 /// # }, |mut stream| async move {
1328 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1329 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1330 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1331 /// // - ...
1332 /// # let mut results = Vec::new();
1333 /// # for w in 0..16 {
1334 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1335 /// # }
1336 /// # results.sort();
1337 /// # assert_eq!(results, vec![
1338 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1339 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1340 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1341 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1342 /// # ]);
1343 /// # }));
1344 /// # }
1345 /// ```
1346 pub fn demux_bincode(
1347 self,
1348 other: &Cluster<'a, L2>,
1349 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1350 where
1351 T: Serialize + DeserializeOwned,
1352 {
1353 self.demux(other, TCP.fail_stop().bincode())
1354 }
1355
1356 /// Sends elements of this stream at each source member to specific members of a destination
1357 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1358 /// message transport.
1359 ///
1360 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1361 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1362 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1363 /// all members.
1364 ///
1365 /// Each cluster member sends its local stream elements, and they are collected at each
1366 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1367 ///
1368 /// # Example
1369 /// ```rust
1370 /// # #[cfg(feature = "deploy")] {
1371 /// # use hydro_lang::prelude::*;
1372 /// # use futures::StreamExt;
1373 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1374 /// # type Source = ();
1375 /// # type Destination = ();
1376 /// let source: Cluster<Source> = flow.cluster::<Source>();
1377 /// let to_send: Stream<_, Cluster<_>, _> = source
1378 /// .source_iter(q!(vec![0, 1, 2, 3]))
1379 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1380 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1381 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1382 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1383 /// # }, |mut stream| async move {
1384 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1385 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1386 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1387 /// // - ...
1388 /// # let mut results = Vec::new();
1389 /// # for w in 0..16 {
1390 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1391 /// # }
1392 /// # results.sort();
1393 /// # assert_eq!(results, vec![
1394 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1395 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1396 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1397 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1398 /// # ]);
1399 /// # }));
1400 /// # }
1401 /// ```
1402 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1403 pub fn demux<N: NetworkFor<T>>(
1404 self,
1405 to: &Cluster<'a, L2>,
1406 via: N,
1407 ) -> KeyedStream<
1408 MemberId<L>,
1409 T,
1410 Cluster<'a, L2, NoConsistency>,
1411 Unbounded,
1412 <O as MinOrder<N::OrderingGuarantee>>::Min,
1413 R,
1414 >
1415 where
1416 T: Serialize + DeserializeOwned,
1417 O: MinOrder<N::OrderingGuarantee>,
1418 {
1419 self.into_keyed().demux(to, via)
1420 }
1421}
1422
1423#[cfg(test)]
1424mod tests {
1425 #[cfg(feature = "sim")]
1426 use stageleft::q;
1427
1428 #[cfg(feature = "sim")]
1429 use crate::live_collections::sliced::sliced;
1430 #[cfg(feature = "sim")]
1431 use crate::location::{Location, MemberId};
1432 #[cfg(feature = "sim")]
1433 use crate::networking::TCP;
1434 #[cfg(feature = "sim")]
1435 use crate::nondet::nondet;
1436 #[cfg(feature = "sim")]
1437 use crate::prelude::FlowBuilder;
1438
1439 #[cfg(feature = "sim")]
1440 #[test]
1441 fn sim_send_bincode_o2o() {
1442 use crate::networking::TCP;
1443
1444 let mut flow = FlowBuilder::new();
1445 let node = flow.process::<()>();
1446 let node2 = flow.process::<()>();
1447
1448 let (in_send, input) = node.sim_input();
1449
1450 let out_recv = input
1451 .send(&node2, TCP.fail_stop().bincode())
1452 .batch(&node2.tick(), nondet!(/** test */))
1453 .count()
1454 .all_ticks()
1455 .sim_output();
1456
1457 let instances = flow.sim().exhaustive(async || {
1458 in_send.send(());
1459 in_send.send(());
1460 in_send.send(());
1461
1462 let received = out_recv.collect::<Vec<_>>().await;
1463 assert!(received.into_iter().sum::<usize>() == 3);
1464 });
1465
1466 assert_eq!(instances, 4); // 2^{3 - 1}
1467 }
1468
1469 #[cfg(feature = "sim")]
1470 #[test]
1471 fn sim_send_bincode_m2o() {
1472 let mut flow = FlowBuilder::new();
1473 let cluster = flow.cluster::<()>();
1474 let node = flow.process::<()>();
1475
1476 let input = cluster.source_iter(q!(vec![1]));
1477
1478 let out_recv = input
1479 .send(&node, TCP.fail_stop().bincode())
1480 .entries()
1481 .batch(&node.tick(), nondet!(/** test */))
1482 .all_ticks()
1483 .sim_output();
1484
1485 let instances = flow
1486 .sim()
1487 .with_cluster_size(&cluster, 4)
1488 .exhaustive(async || {
1489 out_recv
1490 .assert_yields_only_unordered(vec![
1491 (MemberId::from_raw_id(0), 1),
1492 (MemberId::from_raw_id(1), 1),
1493 (MemberId::from_raw_id(2), 1),
1494 (MemberId::from_raw_id(3), 1),
1495 ])
1496 .await
1497 });
1498
1499 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1500 }
1501
1502 #[cfg(feature = "sim")]
1503 #[test]
1504 fn sim_send_bincode_multiple_m2o() {
1505 let mut flow = FlowBuilder::new();
1506 let cluster1 = flow.cluster::<()>();
1507 let cluster2 = flow.cluster::<()>();
1508 let node = flow.process::<()>();
1509
1510 let out_recv_1 = cluster1
1511 .source_iter(q!(vec![1]))
1512 .send(&node, TCP.fail_stop().bincode())
1513 .entries()
1514 .sim_output();
1515
1516 let out_recv_2 = cluster2
1517 .source_iter(q!(vec![2]))
1518 .send(&node, TCP.fail_stop().bincode())
1519 .entries()
1520 .sim_output();
1521
1522 let instances = flow
1523 .sim()
1524 .with_cluster_size(&cluster1, 3)
1525 .with_cluster_size(&cluster2, 4)
1526 .exhaustive(async || {
1527 out_recv_1
1528 .assert_yields_only_unordered(vec![
1529 (MemberId::from_raw_id(0), 1),
1530 (MemberId::from_raw_id(1), 1),
1531 (MemberId::from_raw_id(2), 1),
1532 ])
1533 .await;
1534
1535 out_recv_2
1536 .assert_yields_only_unordered(vec![
1537 (MemberId::from_raw_id(0), 2),
1538 (MemberId::from_raw_id(1), 2),
1539 (MemberId::from_raw_id(2), 2),
1540 (MemberId::from_raw_id(3), 2),
1541 ])
1542 .await;
1543 });
1544
1545 assert_eq!(instances, 1);
1546 }
1547
1548 #[cfg(feature = "sim")]
1549 #[test]
1550 fn sim_send_bincode_o2m() {
1551 let mut flow = FlowBuilder::new();
1552 let cluster = flow.cluster::<()>();
1553 let node = flow.process::<()>();
1554
1555 let input = node.source_iter(q!(vec![
1556 (MemberId::from_raw_id(0), 123),
1557 (MemberId::from_raw_id(1), 456),
1558 ]));
1559
1560 let out_recv = input
1561 .demux(&cluster, TCP.fail_stop().bincode())
1562 .map(q!(|x| x + 1))
1563 .send(&node, TCP.fail_stop().bincode())
1564 .entries()
1565 .sim_output();
1566
1567 flow.sim()
1568 .with_cluster_size(&cluster, 4)
1569 .exhaustive(async || {
1570 out_recv
1571 .assert_yields_only_unordered(vec![
1572 (MemberId::from_raw_id(0), 124),
1573 (MemberId::from_raw_id(1), 457),
1574 ])
1575 .await
1576 });
1577 }
1578
1579 #[cfg(feature = "sim")]
1580 #[test]
1581 fn sim_broadcast_bincode_o2m() {
1582 let mut flow = FlowBuilder::new();
1583 let cluster = flow.cluster::<()>();
1584 let node = flow.process::<()>();
1585
1586 let input = node.source_iter(q!(vec![123, 456]));
1587
1588 let out_recv = input
1589 .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1590 .map(q!(|x| x + 1))
1591 .send(&node, TCP.fail_stop().bincode())
1592 .entries()
1593 .sim_output();
1594
1595 let mut c_1_produced = false;
1596 let mut c_2_produced = false;
1597 let mut c_1_saw_457_but_not_124 = false;
1598
1599 flow.sim()
1600 .with_cluster_size(&cluster, 2)
1601 .exhaustive(async || {
1602 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1603
1604 // check that order is preserved
1605 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1606 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1607 c_1_produced = true;
1608 }
1609
1610 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1611 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1612 c_2_produced = true;
1613 }
1614
1615 if all_out.contains(&(MemberId::from_raw_id(0), 457))
1616 && !all_out.contains(&(MemberId::from_raw_id(0), 124))
1617 {
1618 c_1_saw_457_but_not_124 = true;
1619 }
1620 });
1621
1622 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1623
1624 // in at least one execution, the cluster member received 457 but not 124, this tests
1625 // that the simulator properly explores dynamic membership additions (a member that joins after 123 is broadcast)
1626 assert!(c_1_saw_457_but_not_124);
1627 }
1628
1629 #[cfg(feature = "sim")]
1630 #[test]
1631 fn sim_send_bincode_m2m() {
1632 let mut flow = FlowBuilder::new();
1633 let cluster = flow.cluster::<()>();
1634 let node = flow.process::<()>();
1635
1636 let input = node.source_iter(q!(vec![
1637 (MemberId::from_raw_id(0), 123),
1638 (MemberId::from_raw_id(1), 456),
1639 ]));
1640
1641 let out_recv = input
1642 .demux(&cluster, TCP.fail_stop().bincode())
1643 .map(q!(|x| x + 1))
1644 .flat_map_ordered(q!(|x| vec![
1645 (MemberId::from_raw_id(0), x),
1646 (MemberId::from_raw_id(1), x),
1647 ]))
1648 .demux(&cluster, TCP.fail_stop().bincode())
1649 .entries()
1650 .send(&node, TCP.fail_stop().bincode())
1651 .entries()
1652 .sim_output();
1653
1654 flow.sim()
1655 .with_cluster_size(&cluster, 4)
1656 .exhaustive(async || {
1657 out_recv
1658 .assert_yields_only_unordered(vec![
1659 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1660 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1661 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1662 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1663 ])
1664 .await
1665 });
1666 }
1667
1668 #[cfg(feature = "sim")]
1669 #[test]
1670 fn sim_lossy_delayed_forever_o2o() {
1671 use std::collections::HashSet;
1672
1673 use crate::properties::manual_proof;
1674
1675 let mut flow = FlowBuilder::new();
1676 let node = flow.process::<()>();
1677 let node2 = flow.process::<()>();
1678
1679 let received = node
1680 .source_iter(q!(0..3_u32))
1681 .send(&node2, TCP.lossy_delayed_forever().bincode())
1682 .fold(
1683 q!(|| std::collections::HashSet::<u32>::new()),
1684 q!(
1685 |set, v| {
1686 set.insert(v);
1687 },
1688 commutative = manual_proof!(/** set insert is commutative */)
1689 ),
1690 );
1691
1692 let out_recv = sliced! {
1693 let snapshot = use(received, nondet!(/** test */));
1694 snapshot.into_stream()
1695 }
1696 .sim_output();
1697
1698 let mut saw_non_contiguous = false;
1699
1700 flow.sim().test_safety_only().exhaustive(async || {
1701 let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1702
1703 // Check each individual snapshot for a non-contiguous subset.
1704 for set in &snapshots {
1705 #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1706 if set.len() >= 2 && set.len() < 3 {
1707 let min = *set.iter().min().unwrap();
1708 let max = *set.iter().max().unwrap();
1709 if set.len() < (max - min + 1) as usize {
1710 saw_non_contiguous = true;
1711 }
1712 }
1713 }
1714 });
1715
1716 assert!(
1717 saw_non_contiguous,
1718 "Expected at least one execution with a non-contiguous subset of inputs"
1719 );
1720 }
1721
1722 #[cfg(feature = "sim")]
1723 #[test]
1724 fn sim_broadcast_closed_o2m() {
1725 let mut flow = FlowBuilder::new();
1726 let cluster = flow.cluster::<()>();
1727 let node = flow.process::<()>();
1728
1729 let input = node.source_iter(q!(vec![123, 456]));
1730
1731 let out_recv = input
1732 .broadcast_closed(&cluster, TCP.fail_stop().bincode())
1733 .send(&node, TCP.fail_stop().bincode())
1734 .entries()
1735 .sim_output();
1736
1737 flow.sim()
1738 .with_cluster_size(&cluster, 2)
1739 .exhaustive(async || {
1740 out_recv
1741 .assert_yields_only_unordered(vec![
1742 (MemberId::from_raw_id(0), 123),
1743 (MemberId::from_raw_id(0), 456),
1744 (MemberId::from_raw_id(1), 123),
1745 (MemberId::from_raw_id(1), 456),
1746 ])
1747 .await
1748 });
1749 }
1750
1751 #[cfg(feature = "sim")]
1752 #[test]
1753 fn sim_broadcast_closed_m2m() {
1754 let mut flow = FlowBuilder::new();
1755 let source = flow.cluster::<()>();
1756 let dest: crate::location::Cluster<'_, ()> = flow.cluster::<()>();
1757 let node = flow.process::<()>();
1758
1759 let input = source.source_iter(q!(vec![123]));
1760
1761 // Broadcast from source cluster to dest cluster, then collect at a process.
1762 let out_recv = input
1763 .broadcast_closed(&dest, TCP.fail_stop().bincode())
1764 .entries()
1765 .send(&node, TCP.fail_stop().bincode())
1766 .entries()
1767 .sim_output();
1768
1769 flow.sim()
1770 .with_cluster_size(&source, 2)
1771 .with_cluster_size(&dest, 2)
1772 .exhaustive(async || {
1773 // Each source member (0, 1) broadcasts 123 to each dest member (0, 1).
1774 // The dest members then send to the process keyed by dest member id.
1775 // Each dest member receives (source_0, 123) and (source_1, 123).
1776 out_recv
1777 .assert_yields_only_unordered(vec![
1778 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 123)),
1779 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 123)),
1780 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 123)),
1781 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 123)),
1782 ])
1783 .await
1784 });
1785 }
1786}