hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
20use crate::nondet::NonDet;
21use crate::staging_util::get_this_crate;
22
23// same as the one in `hydro_std`, but internal use only
24fn track_membership<'a, C, L: Location<'a> + NoTick>(
25 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
26) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
27 membership
28 .fold(
29 q!(|| false),
30 q!(|present, event| {
31 match event {
32 MembershipEvent::Joined => *present = true,
33 MembershipEvent::Left => *present = false,
34 }
35 }),
36 )
37 .filter_map(q!(|v| if v { Some(()) } else { None }))
38}
39
40fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
41 let root = get_this_crate();
42
43 if is_demux {
44 parse_quote! {
45 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::location::MemberId<_>, #t_type), _>(
46 |(id, data)| {
47 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
48 }
49 )
50 }
51 } else {
52 parse_quote! {
53 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
54 |data| {
55 #root::runtime_support::bincode::serialize(&data).unwrap().into()
56 }
57 )
58 }
59 }
60}
61
62pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
63 serialize_bincode_with_type(is_demux, "e_type::<T>())
64}
65
66fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
67 let root = get_this_crate();
68
69 if let Some(c_type) = tagged {
70 parse_quote! {
71 |res| {
72 let (id, b) = res.unwrap();
73 (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
74 }
75 }
76 } else {
77 parse_quote! {
78 |res| {
79 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
80 }
81 }
82 }
83}
84
85pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
86 deserialize_bincode_with_type(tagged, "e_type::<T>())
87}
88
89impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
90 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
91 /// using [`bincode`] to serialize/deserialize messages.
92 ///
93 /// The returned stream captures the elements received at the destination, where values will
94 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
95 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
96 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
97 /// dropped no further messages will be sent.
98 ///
99 /// # Example
100 /// ```rust
101 /// # use hydro_lang::prelude::*;
102 /// # use futures::StreamExt;
103 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
104 /// let p1 = flow.process::<()>();
105 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
106 /// let p2 = flow.process::<()>();
107 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
108 /// // 1, 2, 3
109 /// # on_p2.send_bincode(&p_out)
110 /// # }, |mut stream| async move {
111 /// # for w in 1..=3 {
112 /// # assert_eq!(stream.next().await, Some(w));
113 /// # }
114 /// # }));
115 /// ```
116 pub fn send_bincode<L2>(
117 self,
118 other: &Process<'a, L2>,
119 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
120 where
121 T: Serialize + DeserializeOwned,
122 {
123 let serialize_pipeline = Some(serialize_bincode::<T>(false));
124
125 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
126
127 Stream::new(
128 other.clone(),
129 HydroNode::Network {
130 serialize_fn: serialize_pipeline.map(|e| e.into()),
131 instantiate_fn: DebugInstantiate::Building,
132 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
133 input: Box::new(self.ir_node.into_inner()),
134 metadata: other.new_node_metadata(
135 Stream::<T, Process<'a, L2>, Unbounded, O, R>::collection_kind(),
136 ),
137 },
138 )
139 }
140
141 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
142 /// using [`bincode`] to serialize/deserialize messages.
143 ///
144 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
145 /// membership information. This is a common pattern in distributed systems for broadcasting data to
146 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
147 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
148 /// each element to all cluster members.
149 ///
150 /// # Non-Determinism
151 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
152 /// to the current cluster members _at that point in time_. Depending on when we are notified of
153 /// membership changes, we will broadcast each element to different members.
154 ///
155 /// # Example
156 /// ```rust
157 /// # use hydro_lang::prelude::*;
158 /// # use futures::StreamExt;
159 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
160 /// let p1 = flow.process::<()>();
161 /// let workers: Cluster<()> = flow.cluster::<()>();
162 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
163 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
164 /// # on_worker.send_bincode(&p2).entries()
165 /// // if there are 4 members in the cluster, each receives one element
166 /// // - MemberId::<()>(0): [123]
167 /// // - MemberId::<()>(1): [123]
168 /// // - MemberId::<()>(2): [123]
169 /// // - MemberId::<()>(3): [123]
170 /// # }, |mut stream| async move {
171 /// # let mut results = Vec::new();
172 /// # for w in 0..4 {
173 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
174 /// # }
175 /// # results.sort();
176 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
177 /// # }));
178 /// ```
179 pub fn broadcast_bincode<L2: 'a>(
180 self,
181 other: &Cluster<'a, L2>,
182 nondet_membership: NonDet,
183 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
184 where
185 T: Clone + Serialize + DeserializeOwned,
186 {
187 let ids = track_membership(self.location.source_cluster_members(other));
188 let join_tick = self.location.tick();
189 let current_members = ids.snapshot(&join_tick, nondet_membership);
190
191 self.batch(&join_tick, nondet_membership)
192 .repeat_with_keys(current_members)
193 .all_ticks()
194 .demux_bincode(other)
195 }
196
197 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
198 /// serialization. The external process can receive these elements by establishing a TCP
199 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
200 ///
201 /// # Example
202 /// ```rust
203 /// # use hydro_lang::prelude::*;
204 /// # use futures::StreamExt;
205 /// # tokio_test::block_on(async move {
206 /// let flow = FlowBuilder::new();
207 /// let process = flow.process::<()>();
208 /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
209 /// let external = flow.external::<()>();
210 /// let external_handle = numbers.send_bincode_external(&external);
211 ///
212 /// let mut deployment = hydro_deploy::Deployment::new();
213 /// let nodes = flow
214 /// .with_process(&process, deployment.Localhost())
215 /// .with_external(&external, deployment.Localhost())
216 /// .deploy(&mut deployment);
217 ///
218 /// deployment.deploy().await.unwrap();
219 /// // establish the TCP connection
220 /// let mut external_recv_stream = nodes.connect(external_handle).await;
221 /// deployment.start().await.unwrap();
222 ///
223 /// for w in 1..=3 {
224 /// assert_eq!(external_recv_stream.next().await, Some(w));
225 /// }
226 /// # });
227 /// ```
228 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
229 where
230 T: Serialize + DeserializeOwned,
231 {
232 let serialize_pipeline = Some(serialize_bincode::<T>(false));
233
234 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
235
236 let external_key = flow_state_borrow.next_external_out;
237 flow_state_borrow.next_external_out += 1;
238
239 flow_state_borrow.push_root(HydroRoot::SendExternal {
240 to_external_id: other.id,
241 to_key: external_key,
242 to_many: false,
243 serialize_fn: serialize_pipeline.map(|e| e.into()),
244 instantiate_fn: DebugInstantiate::Building,
245 input: Box::new(self.ir_node.into_inner()),
246 op_metadata: HydroIrOpMetadata::new(),
247 });
248
249 ExternalBincodeStream {
250 process_id: other.id,
251 port_id: external_key,
252 _phantom: PhantomData,
253 }
254 }
255}
256
257impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
258 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
259{
260 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
261 /// using [`bincode`] to serialize/deserialize messages.
262 ///
263 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
264 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
265 /// this API allows precise targeting of specific cluster members rather than broadcasting to
266 /// all members.
267 ///
268 /// # Example
269 /// ```rust
270 /// # use hydro_lang::prelude::*;
271 /// # use futures::StreamExt;
272 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
273 /// let p1 = flow.process::<()>();
274 /// let workers: Cluster<()> = flow.cluster::<()>();
275 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
276 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
277 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
278 /// .demux_bincode(&workers);
279 /// # on_worker.send_bincode(&p2).entries()
280 /// // if there are 4 members in the cluster, each receives one element
281 /// // - MemberId::<()>(0): [0]
282 /// // - MemberId::<()>(1): [1]
283 /// // - MemberId::<()>(2): [2]
284 /// // - MemberId::<()>(3): [3]
285 /// # }, |mut stream| async move {
286 /// # let mut results = Vec::new();
287 /// # for w in 0..4 {
288 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
289 /// # }
290 /// # results.sort();
291 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
292 /// # }));
293 /// ```
294 pub fn demux_bincode(
295 self,
296 other: &Cluster<'a, L2>,
297 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
298 where
299 T: Serialize + DeserializeOwned,
300 {
301 self.into_keyed().demux_bincode(other)
302 }
303}
304
305impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
306 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
307 /// [`bincode`] to serialize/deserialize messages.
308 ///
309 /// This provides load balancing by evenly distributing work across cluster members. The
310 /// distribution is deterministic based on element order - the first element goes to member 0,
311 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
312 ///
313 /// # Non-Determinism
314 /// The set of cluster members may asynchronously change over time. Each element is distributed
315 /// based on the current cluster membership _at that point in time_. Depending on when cluster
316 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
317 /// membership is stable, the order of members in the round-robin pattern may change across runs.
318 ///
319 /// # Ordering Requirements
320 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
321 /// order of messages and retries affects the round-robin pattern.
322 ///
323 /// # Example
324 /// ```rust
325 /// # use hydro_lang::prelude::*;
326 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
327 /// # use futures::StreamExt;
328 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
329 /// let p1 = flow.process::<()>();
330 /// let workers: Cluster<()> = flow.cluster::<()>();
331 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
332 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
333 /// on_worker.send_bincode(&p2)
334 /// # .first().values() // we use first to assert that each member gets one element
335 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
336 /// // - MemberId::<()>(?): [1]
337 /// // - MemberId::<()>(?): [2]
338 /// // - MemberId::<()>(?): [3]
339 /// // - MemberId::<()>(?): [4]
340 /// # }, |mut stream| async move {
341 /// # let mut results = Vec::new();
342 /// # for w in 0..4 {
343 /// # results.push(stream.next().await.unwrap());
344 /// # }
345 /// # results.sort();
346 /// # assert_eq!(results, vec![1, 2, 3, 4]);
347 /// # }));
348 /// ```
349 pub fn round_robin_bincode<L2: 'a>(
350 self,
351 other: &Cluster<'a, L2>,
352 nondet_membership: NonDet,
353 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
354 where
355 T: Serialize + DeserializeOwned,
356 {
357 let ids = track_membership(self.location.source_cluster_members(other));
358 let join_tick = self.location.tick();
359 let current_members = ids
360 .snapshot(&join_tick, nondet_membership)
361 .keys()
362 .assume_ordering(nondet_membership)
363 .collect_vec();
364
365 self.enumerate()
366 .batch(&join_tick, nondet_membership)
367 .cross_singleton(current_members)
368 .map(q!(|(data, members)| (
369 members[data.0 % members.len()],
370 data.1
371 )))
372 .all_ticks()
373 .demux_bincode(other)
374 }
375}
376
377impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
378 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
379 /// using [`bincode`] to serialize/deserialize messages.
380 ///
381 /// Each cluster member sends its local stream elements, and they are collected at the destination
382 /// as a [`KeyedStream`] where keys identify the source cluster member.
383 ///
384 /// # Example
385 /// ```rust
386 /// # use hydro_lang::prelude::*;
387 /// # use futures::StreamExt;
388 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
389 /// let workers: Cluster<()> = flow.cluster::<()>();
390 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
391 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
392 /// # all_received.entries()
393 /// # }, |mut stream| async move {
394 /// // if there are 4 members in the cluster, we should receive 4 elements
395 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
396 /// # let mut results = Vec::new();
397 /// # for w in 0..4 {
398 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
399 /// # }
400 /// # results.sort();
401 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
402 /// # }));
403 /// ```
404 ///
405 /// If you don't need to know the source for each element, you can use `.values()`
406 /// to get just the data:
407 /// ```rust
408 /// # use hydro_lang::prelude::*;
409 /// # use hydro_lang::live_collections::stream::NoOrder;
410 /// # use futures::StreamExt;
411 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
412 /// # let workers: Cluster<()> = flow.cluster::<()>();
413 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
414 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
415 /// # values
416 /// # }, |mut stream| async move {
417 /// # let mut results = Vec::new();
418 /// # for w in 0..4 {
419 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
420 /// # }
421 /// # results.sort();
422 /// // if there are 4 members in the cluster, we should receive 4 elements
423 /// // 1, 1, 1, 1
424 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
425 /// # }));
426 /// ```
427 pub fn send_bincode<L2>(
428 self,
429 other: &Process<'a, L2>,
430 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
431 where
432 T: Serialize + DeserializeOwned,
433 {
434 let serialize_pipeline = Some(serialize_bincode::<T>(false));
435
436 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
437
438 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
439 other.clone(),
440 HydroNode::Network {
441 serialize_fn: serialize_pipeline.map(|e| e.into()),
442 instantiate_fn: DebugInstantiate::Building,
443 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
444 input: Box::new(self.ir_node.into_inner()),
445 metadata: other.new_node_metadata(Stream::<
446 (MemberId<L>, T),
447 Process<'a, L2>,
448 Unbounded,
449 O,
450 R,
451 >::collection_kind()),
452 },
453 );
454
455 raw_stream.into_keyed()
456 }
457
458 /// Broadcasts elements of this stream at each source member to all members of a destination
459 /// cluster, using [`bincode`] to serialize/deserialize messages.
460 ///
461 /// Each source member sends each of its stream elements to **every** member of the cluster
462 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
463 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
464 /// **only data elements** and sends each element to all cluster members.
465 ///
466 /// # Non-Determinism
467 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
468 /// to the current cluster members known _at that point in time_ at the source member. Depending
469 /// on when each source member is notified of membership changes, it will broadcast each element
470 /// to different members.
471 ///
472 /// # Example
473 /// ```rust
474 /// # use hydro_lang::prelude::*;
475 /// # use hydro_lang::location::MemberId;
476 /// # use futures::StreamExt;
477 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
478 /// # type Source = ();
479 /// # type Destination = ();
480 /// let source: Cluster<Source> = flow.cluster::<Source>();
481 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
482 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
483 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
484 /// # on_destination.entries().send_bincode(&p2).entries()
485 /// // if there are 4 members in the desination, each receives one element from each source member
486 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
487 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
488 /// // - ...
489 /// # }, |mut stream| async move {
490 /// # let mut results = Vec::new();
491 /// # for w in 0..16 {
492 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
493 /// # }
494 /// # results.sort();
495 /// # assert_eq!(results, vec![
496 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
497 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
498 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
499 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
500 /// # ]);
501 /// # }));
502 /// ```
503 pub fn broadcast_bincode<L2: 'a>(
504 self,
505 other: &Cluster<'a, L2>,
506 nondet_membership: NonDet,
507 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
508 where
509 T: Clone + Serialize + DeserializeOwned,
510 {
511 let ids = track_membership(self.location.source_cluster_members(other));
512 let join_tick = self.location.tick();
513 let current_members = ids.snapshot(&join_tick, nondet_membership);
514
515 self.batch(&join_tick, nondet_membership)
516 .repeat_with_keys(current_members)
517 .all_ticks()
518 .demux_bincode(other)
519 }
520}
521
522impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
523 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
524{
525 /// Sends elements of this stream at each source member to specific members of a destination
526 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
527 ///
528 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
529 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
530 /// this API allows precise targeting of specific cluster members rather than broadcasting to
531 /// all members.
532 ///
533 /// Each cluster member sends its local stream elements, and they are collected at each
534 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
535 ///
536 /// # Example
537 /// ```rust
538 /// # use hydro_lang::prelude::*;
539 /// # use futures::StreamExt;
540 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
541 /// # type Source = ();
542 /// # type Destination = ();
543 /// let source: Cluster<Source> = flow.cluster::<Source>();
544 /// let to_send: Stream<_, Cluster<_>, _> = source
545 /// .source_iter(q!(vec![0, 1, 2, 3]))
546 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
547 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
548 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
549 /// # all_received.entries().send_bincode(&p2).entries()
550 /// # }, |mut stream| async move {
551 /// // if there are 4 members in the destination cluster, each receives one message from each source member
552 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
553 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
554 /// // - ...
555 /// # let mut results = Vec::new();
556 /// # for w in 0..16 {
557 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
558 /// # }
559 /// # results.sort();
560 /// # assert_eq!(results, vec![
561 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
562 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
563 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
564 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
565 /// # ]);
566 /// # }));
567 /// ```
568 pub fn demux_bincode(
569 self,
570 other: &Cluster<'a, L2>,
571 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
572 where
573 T: Serialize + DeserializeOwned,
574 {
575 self.into_keyed().demux_bincode(other)
576 }
577}