use std::fmt::Debug;
use std::marker::PhantomData;
use std::time::Duration;
use dfir_rs::futures::stream::Stream as FuturesStream;
use dfir_rs::{tokio, tokio_stream};
use proc_macro2::Span;
use stageleft::{q, QuotedWithContext};
use super::builder::FlowState;
use crate::cycle::{CycleCollection, ForwardRef, ForwardRefMarker};
use crate::ir::{HydroNode, HydroSource};
use crate::{Singleton, Stream, Unbounded};
pub mod external_process;
pub use external_process::ExternalProcess;
pub mod process;
pub use process::Process;
pub mod cluster;
pub use cluster::{Cluster, ClusterId};
pub mod can_send;
pub use can_send::CanSend;
pub mod tick;
pub use tick::{NoTick, Tick, Timestamped};
#[derive(PartialEq, Eq, Clone, Debug)]
pub enum LocationId {
Process(usize),
Cluster(usize),
Tick(usize, Box<LocationId>),
ExternalProcess(usize),
}
impl LocationId {
pub fn root(&self) -> &LocationId {
match self {
LocationId::Process(_) => self,
LocationId::Cluster(_) => self,
LocationId::Tick(_, id) => id.root(),
LocationId::ExternalProcess(_) => self,
}
}
pub fn raw_id(&self) -> usize {
match self {
LocationId::Process(id) => *id,
LocationId::Cluster(id) => *id,
LocationId::Tick(_, _) => panic!("cannot get raw id for tick"),
LocationId::ExternalProcess(id) => *id,
}
}
}
pub fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
assert_eq!(l1.id(), l2.id(), "locations do not match");
}
pub trait Location<'a>: Clone {
type Root: Location<'a>;
fn root(&self) -> Self::Root;
fn id(&self) -> LocationId;
fn flow_state(&self) -> &FlowState;
fn is_top_level() -> bool;
fn tick(&self) -> Tick<Self>
where
Self: NoTick,
{
let next_id = self.flow_state().borrow_mut().next_clock_id;
self.flow_state().borrow_mut().next_clock_id += 1;
Tick {
id: next_id,
l: self.clone(),
}
}
fn spin(&self) -> Stream<(), Self, Unbounded>
where
Self: Sized + NoTick,
{
Stream::new(
self.clone(),
HydroNode::Persist(Box::new(HydroNode::Source {
source: HydroSource::Spin(),
location_kind: self.id(),
})),
)
}
fn source_stream<T, E: FuturesStream<Item = T> + Unpin>(
&self,
e: impl QuotedWithContext<'a, E, Self>,
) -> Stream<T, Self, Unbounded>
where
Self: Sized + NoTick,
{
let e = e.splice_untyped_ctx(self);
Stream::new(
self.clone(),
HydroNode::Persist(Box::new(HydroNode::Source {
source: HydroSource::Stream(e.into()),
location_kind: self.id(),
})),
)
}
fn source_iter<T, E: IntoIterator<Item = T>>(
&self,
e: impl QuotedWithContext<'a, E, Self>,
) -> Stream<T, Self, Unbounded>
where
Self: Sized + NoTick,
{
let e = e.splice_untyped_ctx(self);
Stream::new(
self.clone(),
HydroNode::Persist(Box::new(HydroNode::Source {
source: HydroSource::Iter(e.into()),
location_kind: self.id(),
})),
)
}
fn singleton<T: Clone>(
&self,
e: impl QuotedWithContext<'a, T, Self>,
) -> Singleton<T, Self, Unbounded>
where
Self: Sized + NoTick,
{
let e_arr = q!([e]);
let e = e_arr.splice_untyped_ctx(self);
Singleton::new(
self.clone(),
HydroNode::Persist(Box::new(HydroNode::Persist(Box::new(HydroNode::Source {
source: HydroSource::Iter(e.into()),
location_kind: self.id(),
})))),
)
}
unsafe fn source_interval(
&self,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
) -> Stream<tokio::time::Instant, Self, Unbounded>
where
Self: Sized + NoTick,
{
self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(interval)
)))
}
unsafe fn source_interval_delayed(
&self,
delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
) -> Stream<tokio::time::Instant, Self, Unbounded>
where
Self: Sized + NoTick,
{
self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
)))
}
fn forward_ref<S: CycleCollection<'a, ForwardRefMarker, Location = Self>>(
&self,
) -> (ForwardRef<'a, S>, S)
where
Self: NoTick,
{
let next_id = {
let on_id = match self.id() {
LocationId::Process(id) => id,
LocationId::Cluster(id) => id,
LocationId::Tick(_, _) => panic!(),
LocationId::ExternalProcess(_) => panic!(),
};
let mut flow_state = self.flow_state().borrow_mut();
let next_id_entry = flow_state.cycle_counts.entry(on_id).or_default();
let id = *next_id_entry;
*next_id_entry += 1;
id
};
let ident = syn::Ident::new(&format!("cycle_{}", next_id), Span::call_site());
(
ForwardRef {
ident: ident.clone(),
expected_location: self.id(),
_phantom: PhantomData,
},
S::create_source(ident, self.clone()),
)
}
}