hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::stream::Retries;
16#[cfg(stageleft_runtime)]
17use crate::location::dynamic::DynLocation;
18use crate::location::external_process::ExternalBincodeStream;
19use crate::location::tick::NoAtomic;
20use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
21use crate::nondet::NonDet;
22use crate::staging_util::get_this_crate;
23
24// same as the one in `hydro_std`, but internal use only
25fn track_membership<'a, C, L: Location<'a> + NoTick + NoAtomic>(
26 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
27) -> KeyedSingleton<MemberId<C>, (), L, Unbounded> {
28 membership
29 .fold(
30 q!(|| false),
31 q!(|present, event| {
32 match event {
33 MembershipEvent::Joined => *present = true,
34 MembershipEvent::Left => *present = false,
35 }
36 }),
37 )
38 .filter_map(q!(|v| if v { Some(()) } else { None }))
39}
40
41fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
42 let root = get_this_crate();
43
44 if is_demux {
45 parse_quote! {
46 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::location::MemberId<_>, #t_type), _>(
47 |(id, data)| {
48 (id.raw_id, #root::runtime_support::bincode::serialize(&data).unwrap().into())
49 }
50 )
51 }
52 } else {
53 parse_quote! {
54 ::#root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
55 |data| {
56 #root::runtime_support::bincode::serialize(&data).unwrap().into()
57 }
58 )
59 }
60 }
61}
62
63pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
64 serialize_bincode_with_type(is_demux, "e_type::<T>())
65}
66
67fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
68 let root = get_this_crate();
69
70 if let Some(c_type) = tagged {
71 parse_quote! {
72 |res| {
73 let (id, b) = res.unwrap();
74 (#root::location::MemberId::<#c_type>::from_raw(id), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
75 }
76 }
77 } else {
78 parse_quote! {
79 |res| {
80 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
81 }
82 }
83 }
84}
85
86pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
87 deserialize_bincode_with_type(tagged, "e_type::<T>())
88}
89
90impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
91 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
92 /// using [`bincode`] to serialize/deserialize messages.
93 ///
94 /// The returned stream captures the elements received at the destination, where values will
95 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
96 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
97 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
98 /// dropped no further messages will be sent.
99 ///
100 /// # Example
101 /// ```rust
102 /// # use hydro_lang::prelude::*;
103 /// # use futures::StreamExt;
104 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
105 /// let p1 = flow.process::<()>();
106 /// let numbers: Stream<_, Process<_>, Unbounded> = p1.source_iter(q!(vec![1, 2, 3]));
107 /// let p2 = flow.process::<()>();
108 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
109 /// // 1, 2, 3
110 /// # on_p2.send_bincode(&p_out)
111 /// # }, |mut stream| async move {
112 /// # for w in 1..=3 {
113 /// # assert_eq!(stream.next().await, Some(w));
114 /// # }
115 /// # }));
116 /// ```
117 pub fn send_bincode<L2>(
118 self,
119 other: &Process<'a, L2>,
120 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
121 where
122 T: Serialize + DeserializeOwned,
123 {
124 let serialize_pipeline = Some(serialize_bincode::<T>(false));
125
126 let deserialize_pipeline = Some(deserialize_bincode::<T>(None));
127
128 Stream::new(
129 other.clone(),
130 HydroNode::Network {
131 serialize_fn: serialize_pipeline.map(|e| e.into()),
132 instantiate_fn: DebugInstantiate::Building,
133 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
134 input: Box::new(self.ir_node.into_inner()),
135 metadata: other.new_node_metadata::<T>(),
136 },
137 )
138 }
139
140 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
141 /// using [`bincode`] to serialize/deserialize messages.
142 ///
143 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
144 /// membership information. This is a common pattern in distributed systems for broadcasting data to
145 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
146 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
147 /// each element to all cluster members.
148 ///
149 /// # Non-Determinism
150 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
151 /// to the current cluster members _at that point in time_. Depending on when we are notified of
152 /// membership changes, we will broadcast each element to different members.
153 ///
154 /// # Example
155 /// ```rust
156 /// # use hydro_lang::prelude::*;
157 /// # use futures::StreamExt;
158 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
159 /// let p1 = flow.process::<()>();
160 /// let workers: Cluster<()> = flow.cluster::<()>();
161 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
162 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
163 /// # on_worker.send_bincode(&p2).entries()
164 /// // if there are 4 members in the cluster, each receives one element
165 /// // - MemberId::<()>(0): [123]
166 /// // - MemberId::<()>(1): [123]
167 /// // - MemberId::<()>(2): [123]
168 /// // - MemberId::<()>(3): [123]
169 /// # }, |mut stream| async move {
170 /// # let mut results = Vec::new();
171 /// # for w in 0..4 {
172 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
173 /// # }
174 /// # results.sort();
175 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
176 /// # }));
177 /// ```
178 pub fn broadcast_bincode<L2: 'a>(
179 self,
180 other: &Cluster<'a, L2>,
181 nondet_membership: NonDet,
182 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
183 where
184 T: Clone + Serialize + DeserializeOwned,
185 {
186 let ids = track_membership(self.location.source_cluster_members(other));
187 let join_tick = self.location.tick();
188 let current_members = ids.snapshot(&join_tick, nondet_membership);
189
190 self.batch(&join_tick, nondet_membership)
191 .repeat_with_keys(current_members)
192 .all_ticks()
193 .demux_bincode(other)
194 }
195
196 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
197 /// serialization. The external process can receive these elements by establishing a TCP
198 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
199 ///
200 /// # Example
201 /// ```rust
202 /// # use hydro_lang::prelude::*;
203 /// # use futures::StreamExt;
204 /// # tokio_test::block_on(async move {
205 /// let flow = FlowBuilder::new();
206 /// let process = flow.process::<()>();
207 /// let numbers: Stream<_, Process<_>, Unbounded> = process.source_iter(q!(vec![1, 2, 3]));
208 /// let external = flow.external::<()>();
209 /// let external_handle = numbers.send_bincode_external(&external);
210 ///
211 /// let mut deployment = hydro_deploy::Deployment::new();
212 /// let nodes = flow
213 /// .with_process(&process, deployment.Localhost())
214 /// .with_external(&external, deployment.Localhost())
215 /// .deploy(&mut deployment);
216 ///
217 /// deployment.deploy().await.unwrap();
218 /// // establish the TCP connection
219 /// let mut external_recv_stream = nodes.connect_source_bincode(external_handle).await;
220 /// deployment.start().await.unwrap();
221 ///
222 /// for w in 1..=3 {
223 /// assert_eq!(external_recv_stream.next().await, Some(w));
224 /// }
225 /// # });
226 /// ```
227 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T>
228 where
229 T: Serialize + DeserializeOwned,
230 {
231 let serialize_pipeline = Some(serialize_bincode::<T>(false));
232
233 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
234
235 let external_key = flow_state_borrow.next_external_out;
236 flow_state_borrow.next_external_out += 1;
237
238 flow_state_borrow.push_root(HydroRoot::SendExternal {
239 to_external_id: other.id,
240 to_key: external_key,
241 to_many: false,
242 serialize_fn: serialize_pipeline.map(|e| e.into()),
243 instantiate_fn: DebugInstantiate::Building,
244 input: Box::new(HydroNode::Unpersist {
245 inner: Box::new(self.ir_node.into_inner()),
246 metadata: self.location.new_node_metadata::<T>(),
247 }),
248 op_metadata: HydroIrOpMetadata::new(),
249 });
250
251 ExternalBincodeStream {
252 process_id: other.id,
253 port_id: external_key,
254 _phantom: PhantomData,
255 }
256 }
257}
258
259impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
260 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
261{
262 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
263 /// using [`bincode`] to serialize/deserialize messages.
264 ///
265 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
266 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
267 /// this API allows precise targeting of specific cluster members rather than broadcasting to
268 /// all members.
269 ///
270 /// # Example
271 /// ```rust
272 /// # use hydro_lang::prelude::*;
273 /// # use futures::StreamExt;
274 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
275 /// let p1 = flow.process::<()>();
276 /// let workers: Cluster<()> = flow.cluster::<()>();
277 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
278 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
279 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)))
280 /// .demux_bincode(&workers);
281 /// # on_worker.send_bincode(&p2).entries()
282 /// // if there are 4 members in the cluster, each receives one element
283 /// // - MemberId::<()>(0): [0]
284 /// // - MemberId::<()>(1): [1]
285 /// // - MemberId::<()>(2): [2]
286 /// // - MemberId::<()>(3): [3]
287 /// # }, |mut stream| async move {
288 /// # let mut results = Vec::new();
289 /// # for w in 0..4 {
290 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
291 /// # }
292 /// # results.sort();
293 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
294 /// # }));
295 /// ```
296 pub fn demux_bincode(
297 self,
298 other: &Cluster<'a, L2>,
299 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
300 where
301 T: Serialize + DeserializeOwned,
302 {
303 self.into_keyed().demux_bincode(other)
304 }
305}
306
307impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
308 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
309 /// [`bincode`] to serialize/deserialize messages.
310 ///
311 /// This provides load balancing by evenly distributing work across cluster members. The
312 /// distribution is deterministic based on element order - the first element goes to member 0,
313 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
314 ///
315 /// # Non-Determinism
316 /// The set of cluster members may asynchronously change over time. Each element is distributed
317 /// based on the current cluster membership _at that point in time_. Depending on when cluster
318 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
319 /// membership is stable, the order of members in the round-robin pattern may change across runs.
320 ///
321 /// # Ordering Requirements
322 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
323 /// order of messages and retries affects the round-robin pattern.
324 ///
325 /// # Example
326 /// ```rust
327 /// # use hydro_lang::prelude::*;
328 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
329 /// # use futures::StreamExt;
330 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
331 /// let p1 = flow.process::<()>();
332 /// let workers: Cluster<()> = flow.cluster::<()>();
333 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
334 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
335 /// on_worker.send_bincode(&p2)
336 /// # .first().values() // we use first to assert that each member gets one element
337 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
338 /// // - MemberId::<()>(?): [1]
339 /// // - MemberId::<()>(?): [2]
340 /// // - MemberId::<()>(?): [3]
341 /// // - MemberId::<()>(?): [4]
342 /// # }, |mut stream| async move {
343 /// # let mut results = Vec::new();
344 /// # for w in 0..4 {
345 /// # results.push(stream.next().await.unwrap());
346 /// # }
347 /// # results.sort();
348 /// # assert_eq!(results, vec![1, 2, 3, 4]);
349 /// # }));
350 /// ```
351 pub fn round_robin_bincode<L2: 'a>(
352 self,
353 other: &Cluster<'a, L2>,
354 nondet_membership: NonDet,
355 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
356 where
357 T: Serialize + DeserializeOwned,
358 {
359 let ids = track_membership(self.location.source_cluster_members(other));
360 let join_tick = self.location.tick();
361 let current_members = ids
362 .snapshot(&join_tick, nondet_membership)
363 .keys()
364 .assume_ordering(nondet_membership)
365 .collect_vec();
366
367 self.enumerate()
368 .batch(&join_tick, nondet_membership)
369 .cross_singleton(current_members)
370 .map(q!(|(data, members)| (
371 members[data.0 % members.len()],
372 data.1
373 )))
374 .all_ticks()
375 .demux_bincode(other)
376 }
377}
378
379impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
380 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
381 /// using [`bincode`] to serialize/deserialize messages.
382 ///
383 /// Each cluster member sends its local stream elements, and they are collected at the destination
384 /// as a [`KeyedStream`] where keys identify the source cluster member.
385 ///
386 /// # Example
387 /// ```rust
388 /// # use hydro_lang::prelude::*;
389 /// # use futures::StreamExt;
390 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
391 /// let workers: Cluster<()> = flow.cluster::<()>();
392 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
393 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
394 /// # all_received.entries()
395 /// # }, |mut stream| async move {
396 /// // if there are 4 members in the cluster, we should receive 4 elements
397 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
398 /// # let mut results = Vec::new();
399 /// # for w in 0..4 {
400 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
401 /// # }
402 /// # results.sort();
403 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
404 /// # }));
405 /// ```
406 ///
407 /// If you don't need to know the source for each element, you can use `.values()`
408 /// to get just the data:
409 /// ```rust
410 /// # use hydro_lang::prelude::*;
411 /// # use hydro_lang::live_collections::stream::NoOrder;
412 /// # use futures::StreamExt;
413 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
414 /// # let workers: Cluster<()> = flow.cluster::<()>();
415 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
416 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
417 /// # values
418 /// # }, |mut stream| async move {
419 /// # let mut results = Vec::new();
420 /// # for w in 0..4 {
421 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
422 /// # }
423 /// # results.sort();
424 /// // if there are 4 members in the cluster, we should receive 4 elements
425 /// // 1, 1, 1, 1
426 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
427 /// # }));
428 /// ```
429 pub fn send_bincode<L2>(
430 self,
431 other: &Process<'a, L2>,
432 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
433 where
434 T: Serialize + DeserializeOwned,
435 {
436 let serialize_pipeline = Some(serialize_bincode::<T>(false));
437
438 let deserialize_pipeline = Some(deserialize_bincode::<T>(Some("e_type::<L>())));
439
440 let raw_stream: Stream<(MemberId<L>, T), Process<'a, L2>, Unbounded, O, R> = Stream::new(
441 other.clone(),
442 HydroNode::Network {
443 serialize_fn: serialize_pipeline.map(|e| e.into()),
444 instantiate_fn: DebugInstantiate::Building,
445 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
446 input: Box::new(self.ir_node.into_inner()),
447 metadata: other.new_node_metadata::<(MemberId<L>, T)>(),
448 },
449 );
450
451 raw_stream.into_keyed()
452 }
453
454 /// Broadcasts elements of this stream at each source member to all members of a destination
455 /// cluster, using [`bincode`] to serialize/deserialize messages.
456 ///
457 /// Each source member sends each of its stream elements to **every** member of the cluster
458 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
459 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
460 /// **only data elements** and sends each element to all cluster members.
461 ///
462 /// # Non-Determinism
463 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
464 /// to the current cluster members known _at that point in time_ at the source member. Depending
465 /// on when each source member is notified of membership changes, it will broadcast each element
466 /// to different members.
467 ///
468 /// # Example
469 /// ```rust
470 /// # use hydro_lang::prelude::*;
471 /// # use hydro_lang::location::MemberId;
472 /// # use futures::StreamExt;
473 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
474 /// # type Source = ();
475 /// # type Destination = ();
476 /// let source: Cluster<Source> = flow.cluster::<Source>();
477 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
478 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
479 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
480 /// # on_destination.entries().send_bincode(&p2).entries()
481 /// // if there are 4 members in the desination, each receives one element from each source member
482 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
483 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
484 /// // - ...
485 /// # }, |mut stream| async move {
486 /// # let mut results = Vec::new();
487 /// # for w in 0..16 {
488 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
489 /// # }
490 /// # results.sort();
491 /// # assert_eq!(results, vec![
492 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
493 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
494 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
495 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
496 /// # ]);
497 /// # }));
498 /// ```
499 pub fn broadcast_bincode<L2: 'a>(
500 self,
501 other: &Cluster<'a, L2>,
502 nondet_membership: NonDet,
503 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
504 where
505 T: Clone + Serialize + DeserializeOwned,
506 {
507 let ids = track_membership(self.location.source_cluster_members(other));
508 let join_tick = self.location.tick();
509 let current_members = ids.snapshot(&join_tick, nondet_membership);
510
511 self.batch(&join_tick, nondet_membership)
512 .repeat_with_keys(current_members)
513 .all_ticks()
514 .demux_bincode(other)
515 }
516}
517
518impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
519 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
520{
521 /// Sends elements of this stream at each source member to specific members of a destination
522 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
523 ///
524 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
525 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
526 /// this API allows precise targeting of specific cluster members rather than broadcasting to
527 /// all members.
528 ///
529 /// Each cluster member sends its local stream elements, and they are collected at each
530 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
531 ///
532 /// # Example
533 /// ```rust
534 /// # use hydro_lang::prelude::*;
535 /// # use futures::StreamExt;
536 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
537 /// # type Source = ();
538 /// # type Destination = ();
539 /// let source: Cluster<Source> = flow.cluster::<Source>();
540 /// let to_send: Stream<_, Cluster<_>, _> = source
541 /// .source_iter(q!(vec![0, 1, 2, 3]))
542 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw(x), x)));
543 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
544 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
545 /// # all_received.entries().send_bincode(&p2).entries()
546 /// # }, |mut stream| async move {
547 /// // if there are 4 members in the destination cluster, each receives one message from each source member
548 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
549 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
550 /// // - ...
551 /// # let mut results = Vec::new();
552 /// # for w in 0..16 {
553 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
554 /// # }
555 /// # results.sort();
556 /// # assert_eq!(results, vec![
557 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
558 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
559 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
560 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
561 /// # ]);
562 /// # }));
563 /// ```
564 pub fn demux_bincode(
565 self,
566 other: &Cluster<'a, L2>,
567 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
568 where
569 T: Serialize + DeserializeOwned,
570 {
571 self.into_keyed().demux_bincode(other)
572 }
573}