1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
//! Organizational module for Hydroflow Send/RecvCtx structs and Input/OutputPort structs.
use std::cell::RefMut;
use std::marker::PhantomData;

use ref_cast::RefCast;
use sealed::sealed;

use super::HandoffId;
use crate::scheduled::graph::Hydroflow;
use crate::scheduled::handoff::{CanReceive, Handoff, TeeingHandoff, TryCanReceive};

/// An empty trait used to denote [`Polarity`]: either **send** or **receive**.
///
/// [`SendPort`] and [`RecvPort`] have identical representations (via [`Port`]) but are not
/// interchangable, so [`SEND`] and [`RECV`] which implement this trait are used to differentiate
/// between the two polarities.
#[sealed]
pub trait Polarity: 'static {}

/// An uninstantiable type used to tag port [`Polarity`] as **send**.
///
/// See also: [`RECV`].
#[expect(clippy::upper_case_acronyms, reason = "marker type")]
pub enum SEND {}
/// An uninstantiable type used to tag port [`Polarity`] as **receive**.
///
/// See also: [`SEND`].
#[expect(clippy::upper_case_acronyms, reason = "marker type")]
pub enum RECV {}
#[sealed]
impl Polarity for SEND {}
#[sealed]
impl Polarity for RECV {}

/// Lightweight ID struct representing an input or output port for a [`Handoff`] added to a
/// [`Hydroflow`] instance..
#[must_use]
pub struct Port<S: Polarity, H>
where
    H: Handoff,
{
    pub(crate) handoff_id: HandoffId,
    #[expect(clippy::type_complexity, reason = "phantom data")]
    pub(crate) _marker: PhantomData<(*const S, fn() -> H)>,
}
/// Send-specific variant of [`Port`]. An output port.
pub type SendPort<H> = Port<SEND, H>;
/// Recv-specific variant of [`Port`]. An input port.
pub type RecvPort<H> = Port<RECV, H>;

/// Methods for [`TeeingHandoff`] teeing and dropping.
impl<T: Clone> RecvPort<TeeingHandoff<T>> {
    /// Tees this [`TeeingHandoff`], given the [`Hydroflow`] instance it belongs to.
    pub fn tee(&self, hf: &mut Hydroflow) -> RecvPort<TeeingHandoff<T>> {
        hf.teeing_handoff_tee(self)
    }

    /// Marks this output of a [`TeeingHandoff`] as dropped so that no more data will be sent to
    /// it, given the [`Hydroflow`] instance it belongs to.
    ///
    /// It is recommended to not not use this method and instead simply avoid teeing a
    /// [`TeeingHandoff`] when it is not needed.
    pub fn drop(self, hf: &mut Hydroflow) {
        hf.teeing_handoff_drop(self)
    }
}

/// Wrapper around a handoff to differentiate between output and input.
#[derive(RefCast)]
#[repr(transparent)]
pub struct PortCtx<S: Polarity, H> {
    pub(crate) handoff: H,
    pub(crate) _marker: PhantomData<*const S>,
}
/// Send-specific [`PortCtx`]. Output to send into a handoff.
pub type SendCtx<H> = PortCtx<SEND, H>;
/// Recv-specific [`PortCtx`]. Input to receive from a handoff.
pub type RecvCtx<H> = PortCtx<RECV, H>;

/// Context provided to a subgraph for reading from a handoff. Corresponds to a [`SendPort`].
impl<H: Handoff> SendCtx<H> {
    /// Alias for [`Handoff::give`] on the inner `H`.
    pub fn give<T>(&self, item: T) -> T
    where
        H: CanReceive<T>,
    {
        <H as CanReceive<T>>::give(&self.handoff, item)
    }

    /// Alias for [`Handoff::try_give`] on the inner `H`.
    pub fn try_give<T>(&self, item: T) -> Result<T, T>
    where
        H: TryCanReceive<T>,
    {
        <H as TryCanReceive<T>>::try_give(&self.handoff, item)
    }
}

/// Context provided to a subgraph for reading from a handoff. Corresponds to a [`RecvPort`].
impl<H: Handoff> RecvCtx<H> {
    /// See [`Handoff::take_inner`].
    pub fn take_inner(&self) -> H::Inner {
        self.handoff.take_inner()
    }

    /// See [`Handoff::borrow_mut_swap`].
    pub fn borrow_mut_swap(&self) -> RefMut<H::Inner> {
        self.handoff.borrow_mut_swap()
    }
}