hydro_cli/
lib.rs

1#![expect(
2    unused_qualifications,
3    non_local_definitions,
4    unsafe_op_in_unsafe_fn,
5    reason = "for pyo3 generated code"
6)]
7
8use core::rust_crate::ports::RustCrateSource;
9use std::cell::OnceCell;
10use std::collections::HashMap;
11use std::ops::Deref;
12use std::pin::Pin;
13use std::sync::{Arc, OnceLock};
14
15use bytes::Bytes;
16use futures::{Future, SinkExt, StreamExt};
17use hydro_deploy_integration::{
18    ConnectedDirect, ConnectedSink, ConnectedSource, Connection, DynSink, DynStream,
19};
20use pyo3::exceptions::{PyException, PyStopAsyncIteration};
21use pyo3::prelude::*;
22use pyo3::types::{PyBytes, PyDict};
23use pyo3::{create_exception, wrap_pymodule};
24use pyo3_asyncio::TaskLocals;
25use pythonize::pythonize;
26use tokio::sync::oneshot::Sender;
27use tokio::sync::{Mutex, RwLock};
28
29mod cli;
30use hydro_deploy::ssh::LaunchedSshHost;
31use hydro_deploy::{self as core};
32
33static TOKIO_RUNTIME: std::sync::RwLock<Option<tokio::runtime::Runtime>> =
34    std::sync::RwLock::new(None);
35
36#[pyfunction]
37fn cleanup_runtime() {
38    drop(TOKIO_RUNTIME.write().unwrap().take());
39}
40
41struct TokioRuntime {}
42
43impl pyo3_asyncio::generic::Runtime for TokioRuntime {
44    type JoinError = tokio::task::JoinError;
45    type JoinHandle = tokio::task::JoinHandle<()>;
46
47    fn spawn<F>(fut: F) -> Self::JoinHandle
48    where
49        F: Future<Output = ()> + Send + 'static,
50    {
51        TOKIO_RUNTIME
52            .read()
53            .unwrap()
54            .as_ref()
55            .unwrap()
56            .spawn(async move {
57                fut.await;
58            })
59    }
60}
61
62tokio::task_local! {
63    static TASK_LOCALS: OnceCell<TaskLocals>;
64}
65
66impl pyo3_asyncio::generic::ContextExt for TokioRuntime {
67    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
68    where
69        F: Future<Output = R> + Send + 'static,
70    {
71        let cell = OnceCell::new();
72        cell.set(locals).unwrap();
73
74        Box::pin(TASK_LOCALS.scope(cell, fut))
75    }
76
77    fn get_task_locals() -> Option<TaskLocals> {
78        TASK_LOCALS
79            .try_with(|c| c.get().cloned())
80            .unwrap_or_default()
81    }
82}
83
84create_exception!(hydro_cli_core, AnyhowError, PyException);
85
86#[pyclass]
87struct SafeCancelToken {
88    cancel_tx: Option<Sender<()>>,
89}
90
91#[pymethods]
92impl SafeCancelToken {
93    fn safe_cancel(&mut self) {
94        if let Some(token) = self.cancel_tx.take() {
95            let _ = token.send(());
96        }
97    }
98}
99
100static CONVERTERS_MODULE: OnceLock<Py<PyModule>> = OnceLock::new();
101
102fn interruptible_future_to_py<F, T>(py: Python<'_>, fut: F) -> PyResult<&PyAny>
103where
104    F: Future<Output = PyResult<T>> + Send + 'static,
105    T: IntoPy<PyObject>,
106{
107    let module = CONVERTERS_MODULE.get().unwrap().clone().into_ref(py);
108
109    let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
110
111    let base_coro = pyo3_asyncio::generic::future_into_py::<TokioRuntime, _, _>(py, async move {
112        tokio::select! {
113            biased;
114            _ = cancel_rx => Ok(None),
115            r = fut => r.map(|o| Some(o))
116        }
117    })?;
118
119    module.call_method1(
120        "coroutine_to_safely_cancellable",
121        (
122            base_coro,
123            SafeCancelToken {
124                cancel_tx: Some(cancel_tx),
125            },
126        ),
127    )
128}
129
130#[pyclass]
131#[derive(Clone)]
132pub struct AnyhowWrapper {
133    pub underlying: Arc<RwLock<Option<anyhow::Error>>>,
134}
135
136#[pymethods]
137impl AnyhowWrapper {
138    fn __str__(&self) -> PyResult<String> {
139        Ok(format!(
140            "{:?}",
141            self.underlying.try_read().unwrap().as_ref().unwrap()
142        ))
143    }
144}
145
146#[pyclass(subclass)]
147#[derive(Clone)]
148struct HydroflowSink {
149    underlying: Arc<dyn core::rust_crate::ports::RustCrateSink>,
150}
151
152#[pyclass(name = "Deployment")]
153pub struct Deployment {
154    underlying: Arc<RwLock<core::Deployment>>,
155}
156
157#[pymethods]
158impl Deployment {
159    #[new]
160    fn new() -> Self {
161        Deployment {
162            underlying: Arc::new(RwLock::new(core::Deployment::new())),
163        }
164    }
165
166    #[expect(non_snake_case, reason = "pymethods")]
167    fn Localhost(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
168        let arc = self.underlying.blocking_read().Localhost();
169
170        Ok(Py::new(
171            py,
172            PyClassInitializer::from(Host {
173                underlying: arc.clone(),
174            })
175            .add_subclass(LocalhostHost { underlying: arc }),
176        )?
177        .into_py(py))
178    }
179
180    #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
181    fn GcpComputeEngineHost(
182        &self,
183        py: Python<'_>,
184        project: String,
185        machine_type: String,
186        image: String,
187        region: String,
188        network: GcpNetwork,
189        user: Option<String>,
190        startup_script: Option<String>,
191    ) -> PyResult<Py<PyAny>> {
192        let arc = self.underlying.blocking_write().add_host(|id| {
193            core::GcpComputeEngineHost::new(
194                id,
195                project,
196                machine_type,
197                image,
198                region,
199                network.underlying,
200                user,
201                startup_script,
202            )
203        });
204
205        Ok(Py::new(
206            py,
207            PyClassInitializer::from(Host {
208                underlying: arc.clone(),
209            })
210            .add_subclass(GcpComputeEngineHost { underlying: arc }),
211        )?
212        .into_py(py))
213    }
214
215    #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
216    fn AzureHost(
217        &self,
218        py: Python<'_>,
219        project: String,
220        os_type: String, // linux or windows
221        machine_size: String,
222        region: String,
223        image: Option<HashMap<String, String>>,
224        user: Option<String>,
225    ) -> PyResult<Py<PyAny>> {
226        let arc = self.underlying.blocking_write().add_host(|id| {
227            core::AzureHost::new(id, project, os_type, machine_size, image, region, user)
228        });
229
230        Ok(Py::new(
231            py,
232            PyClassInitializer::from(Host {
233                underlying: arc.clone(),
234            })
235            .add_subclass(AzureHost { underlying: arc }),
236        )?
237        .into_py(py))
238    }
239
240    #[expect(non_snake_case, reason = "pymethods")]
241    fn CustomService(
242        &self,
243        py: Python<'_>,
244        on: &Host,
245        external_ports: Vec<u16>,
246    ) -> PyResult<Py<PyAny>> {
247        let service = self
248            .underlying
249            .blocking_write()
250            .CustomService(on.underlying.clone(), external_ports);
251
252        Ok(Py::new(
253            py,
254            PyClassInitializer::from(Service {
255                underlying: service.clone(),
256            })
257            .add_subclass(CustomService {
258                underlying: service,
259            }),
260        )?
261        .into_py(py))
262    }
263
264    #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
265    fn HydroflowCrate(
266        &self,
267        py: Python<'_>,
268        src: String,
269        on: &Host,
270        bin: Option<String>,
271        example: Option<String>,
272        profile: Option<String>,
273        features: Option<Vec<String>>,
274        args: Option<Vec<String>>,
275        display_id: Option<String>,
276        external_ports: Option<Vec<u16>>,
277    ) -> PyResult<Py<PyAny>> {
278        let service = self.underlying.blocking_write().add_service(|id| {
279            core::rust_crate::RustCrateService::new(
280                id,
281                src.into(),
282                on.underlying.clone(),
283                bin,
284                example,
285                profile,
286                None,  // Python API doesn't support rustflags
287                None,  // Python API doesn't support target_dir
288                false, // Python API doesn't support no_default_features
289                None,  // Python API doesn't support perf
290                features,
291                args,
292                display_id,
293                external_ports.unwrap_or_default(),
294            )
295        });
296
297        Ok(Py::new(
298            py,
299            PyClassInitializer::from(Service {
300                underlying: service.clone(),
301            })
302            .add_subclass(HydroflowCrate {
303                underlying: service,
304            }),
305        )?
306        .into_py(py))
307    }
308
309    fn deploy<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
310        let underlying = self.underlying.clone();
311        let py_none = py.None();
312        interruptible_future_to_py(py, async move {
313            underlying.write().await.deploy().await.map_err(|e| {
314                AnyhowError::new_err(AnyhowWrapper {
315                    underlying: Arc::new(RwLock::new(Some(e))),
316                })
317            })?;
318            Ok(py_none)
319        })
320    }
321
322    fn start<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
323        let underlying = self.underlying.clone();
324        let py_none = py.None();
325        interruptible_future_to_py(py, async move {
326            underlying.write().await.start().await.map_err(|e| {
327                AnyhowError::new_err(AnyhowWrapper {
328                    underlying: Arc::new(RwLock::new(Some(e))),
329                })
330            })?;
331            Ok(py_none)
332        })
333    }
334}
335
336#[pyclass(subclass)]
337pub struct Host {
338    underlying: Arc<dyn core::Host>,
339}
340
341#[pyclass(extends=Host, subclass)]
342struct LocalhostHost {
343    underlying: Arc<core::LocalhostHost>,
344}
345
346#[pymethods]
347impl LocalhostHost {
348    fn client_only(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
349        let arc = Arc::new(self.underlying.client_only());
350
351        Ok(Py::new(
352            py,
353            PyClassInitializer::from(Host {
354                underlying: arc.clone(),
355            })
356            .add_subclass(LocalhostHost { underlying: arc }),
357        )?
358        .into_py(py))
359    }
360}
361
362#[pyclass]
363#[derive(Clone)]
364struct GcpNetwork {
365    underlying: Arc<RwLock<core::gcp::GcpNetwork>>,
366}
367
368#[pymethods]
369impl GcpNetwork {
370    #[new]
371    fn new(project: String, existing: Option<String>) -> Self {
372        GcpNetwork {
373            underlying: Arc::new(RwLock::new(core::gcp::GcpNetwork::new(project, existing))),
374        }
375    }
376}
377
378#[pyclass(extends=Host, subclass)]
379struct GcpComputeEngineHost {
380    underlying: Arc<core::GcpComputeEngineHost>,
381}
382
383#[pymethods]
384impl GcpComputeEngineHost {
385    #[getter]
386    fn internal_ip(&self) -> String {
387        self.underlying.launched.get().unwrap().internal_ip.clone()
388    }
389
390    #[getter]
391    fn external_ip(&self) -> Option<String> {
392        self.underlying.launched.get().unwrap().external_ip.clone()
393    }
394
395    #[getter]
396    fn ssh_key_path(&self) -> String {
397        self.underlying
398            .launched
399            .get()
400            .unwrap()
401            .ssh_key_path()
402            .to_str()
403            .unwrap()
404            .to_string()
405    }
406}
407
408#[pyclass(extends=Host, subclass)]
409struct AzureHost {
410    underlying: Arc<core::AzureHost>,
411}
412
413#[pymethods]
414impl AzureHost {
415    #[getter]
416    fn internal_ip(&self) -> String {
417        self.underlying.launched.get().unwrap().internal_ip.clone()
418    }
419
420    #[getter]
421    fn external_ip(&self) -> Option<String> {
422        self.underlying.launched.get().unwrap().external_ip.clone()
423    }
424
425    #[getter]
426    fn ssh_key_path(&self) -> String {
427        self.underlying
428            .launched
429            .get()
430            .unwrap()
431            .ssh_key_path()
432            .to_str()
433            .unwrap()
434            .to_string()
435    }
436}
437
438#[pyclass(subclass)]
439pub struct Service {
440    underlying: Arc<RwLock<dyn core::Service>>,
441}
442
443#[pymethods]
444impl Service {
445    fn stop<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
446        let underlying = self.underlying.clone();
447        let py_none = py.None();
448        interruptible_future_to_py(py, async move {
449            underlying.write().await.stop().await.unwrap();
450            Ok(py_none)
451        })
452    }
453}
454
455#[pyclass]
456struct PyReceiver {
457    receiver: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<String>>>,
458}
459
460#[pymethods]
461impl PyReceiver {
462    fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
463        slf
464    }
465
466    fn __anext__<'p>(&self, py: Python<'p>) -> Option<&'p PyAny> {
467        let receiver = self.receiver.clone();
468        Some(
469            interruptible_future_to_py(py, async move {
470                receiver
471                    .lock()
472                    .await
473                    .recv()
474                    .await
475                    .ok_or_else(|| PyStopAsyncIteration::new_err(()))
476            })
477            .unwrap(),
478        )
479    }
480}
481
482#[pyclass(extends=Service, subclass)]
483struct CustomService {
484    underlying: Arc<RwLock<core::CustomService>>,
485}
486
487#[pymethods]
488impl CustomService {
489    fn client_port(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
490        let arc = Arc::new(core::custom_service::CustomClientPort::new(Arc::downgrade(
491            &self.underlying,
492        )));
493
494        Ok(Py::new(
495            py,
496            PyClassInitializer::from(HydroflowSink {
497                underlying: arc.clone(),
498            })
499            .add_subclass(CustomClientPort { underlying: arc }),
500        )?
501        .into_py(py))
502    }
503}
504
505#[pyclass(extends=HydroflowSink, subclass)]
506#[derive(Clone)]
507struct CustomClientPort {
508    underlying: Arc<core::custom_service::CustomClientPort>,
509}
510
511#[pymethods]
512impl CustomClientPort {
513    fn send_to(&self, to: &HydroflowSink) {
514        self.underlying.send_to(to.underlying.deref());
515    }
516
517    fn tagged(&self, tag: u32) -> TaggedSource {
518        TaggedSource {
519            underlying: Arc::new(core::rust_crate::ports::TaggedSource {
520                source: self.underlying.clone(),
521                tag,
522            }),
523        }
524    }
525
526    fn server_port<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
527        let underlying = self.underlying.clone();
528        interruptible_future_to_py(py, async move {
529            Ok(ServerPort {
530                underlying: underlying.server_port().await,
531            })
532        })
533    }
534}
535
536#[pyclass(extends=Service, subclass)]
537struct HydroflowCrate {
538    underlying: Arc<RwLock<core::rust_crate::RustCrateService>>,
539}
540
541#[pymethods]
542impl HydroflowCrate {
543    fn stdout<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
544        let underlying = self.underlying.clone();
545        interruptible_future_to_py(py, async move {
546            let underlying = underlying.read().await;
547            Ok(PyReceiver {
548                receiver: Arc::new(Mutex::new(underlying.stdout())),
549            })
550        })
551    }
552
553    fn stderr<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
554        let underlying = self.underlying.clone();
555        interruptible_future_to_py(py, async move {
556            let underlying = underlying.read().await;
557            Ok(PyReceiver {
558                receiver: Arc::new(Mutex::new(underlying.stderr())),
559            })
560        })
561    }
562
563    fn exit_code<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
564        let underlying = self.underlying.clone();
565        interruptible_future_to_py(py, async move {
566            let underlying = underlying.read().await;
567            Ok(underlying.exit_code())
568        })
569    }
570
571    #[getter]
572    fn ports(&self) -> HydroflowCratePorts {
573        HydroflowCratePorts {
574            underlying: self.underlying.clone(),
575        }
576    }
577}
578
579#[pyclass]
580#[derive(Clone)]
581struct HydroflowCratePorts {
582    underlying: Arc<RwLock<core::rust_crate::RustCrateService>>,
583}
584
585#[pymethods]
586impl HydroflowCratePorts {
587    fn __getattribute__(&self, name: String, py: Python<'_>) -> PyResult<Py<PyAny>> {
588        let arc = Arc::new(
589            self.underlying
590                .try_read()
591                .unwrap()
592                .get_port(name, &self.underlying),
593        );
594
595        Ok(Py::new(
596            py,
597            PyClassInitializer::from(HydroflowSink {
598                underlying: arc.clone(),
599            })
600            .add_subclass(HydroflowCratePort { underlying: arc }),
601        )?
602        .into_py(py))
603    }
604}
605
606#[pyclass(extends=HydroflowSink, subclass)]
607#[derive(Clone)]
608struct HydroflowCratePort {
609    underlying: Arc<core::rust_crate::ports::RustCratePortConfig>,
610}
611
612#[pymethods]
613impl HydroflowCratePort {
614    fn merge(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
615        let arc = Arc::new(self.underlying.clone().merge());
616
617        Ok(Py::new(
618            py,
619            PyClassInitializer::from(HydroflowSink {
620                underlying: arc.clone(),
621            })
622            .add_subclass(HydroflowCratePort { underlying: arc }),
623        )?
624        .into_py(py))
625    }
626
627    fn send_to(&self, to: &HydroflowSink) {
628        self.underlying.send_to(to.underlying.deref());
629    }
630
631    fn tagged(&self, tag: u32) -> TaggedSource {
632        TaggedSource {
633            underlying: Arc::new(core::rust_crate::ports::TaggedSource {
634                source: self.underlying.clone(),
635                tag,
636            }),
637        }
638    }
639}
640
641#[pyfunction]
642fn demux(mapping: &PyDict) -> HydroflowSink {
643    HydroflowSink {
644        underlying: Arc::new(core::rust_crate::ports::DemuxSink {
645            demux: mapping
646                .into_iter()
647                .map(|(k, v)| {
648                    let k = k.extract::<u32>().unwrap();
649                    let v = v.extract::<HydroflowSink>().unwrap();
650                    (k, v.underlying)
651                })
652                .collect(),
653        }),
654    }
655}
656
657#[pyclass(subclass)]
658#[derive(Clone)]
659struct TaggedSource {
660    underlying: Arc<core::rust_crate::ports::TaggedSource>,
661}
662
663#[pymethods]
664impl TaggedSource {
665    fn send_to(&self, to: &HydroflowSink) {
666        self.underlying.send_to(to.underlying.deref());
667    }
668
669    fn tagged(&self, tag: u32) -> TaggedSource {
670        TaggedSource {
671            underlying: Arc::new(core::rust_crate::ports::TaggedSource {
672                source: self.underlying.clone(),
673                tag,
674            }),
675        }
676    }
677}
678
679#[pyclass(extends=HydroflowSink, subclass)]
680#[derive(Clone)]
681struct HydroflowNull {
682    underlying: Arc<core::rust_crate::ports::NullSourceSink>,
683}
684
685#[pymethods]
686impl HydroflowNull {
687    fn send_to(&self, to: &HydroflowSink) {
688        self.underlying.send_to(to.underlying.deref());
689    }
690
691    fn tagged(&self, tag: u32) -> TaggedSource {
692        TaggedSource {
693            underlying: Arc::new(core::rust_crate::ports::TaggedSource {
694                source: self.underlying.clone(),
695                tag,
696            }),
697        }
698    }
699}
700
701#[pyfunction]
702fn null(py: Python<'_>) -> PyResult<Py<PyAny>> {
703    let arc = Arc::new(core::rust_crate::ports::NullSourceSink);
704
705    Ok(Py::new(
706        py,
707        PyClassInitializer::from(HydroflowSink {
708            underlying: arc.clone(),
709        })
710        .add_subclass(HydroflowNull { underlying: arc }),
711    )?
712    .into_py(py))
713}
714
715#[pyclass]
716struct ServerPort {
717    underlying: hydro_deploy_integration::ServerPort,
718}
719
720fn with_tokio_runtime<T>(f: impl Fn() -> T) -> T {
721    let runtime_read = TOKIO_RUNTIME.read().unwrap();
722    let _guard = runtime_read.as_ref().unwrap().enter();
723    f()
724}
725
726#[pymethods]
727impl ServerPort {
728    fn json(&self, py: Python<'_>) -> Py<PyAny> {
729        pythonize(py, &self.underlying).unwrap()
730    }
731
732    #[expect(clippy::wrong_self_convention, reason = "pymethods")]
733    fn into_source<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
734        let realized = with_tokio_runtime(|| Connection::AsClient(self.underlying.connect()));
735
736        interruptible_future_to_py(py, async move {
737            Ok(PythonStream {
738                underlying: Arc::new(RwLock::new(
739                    realized.connect::<ConnectedDirect>().await.into_source(),
740                )),
741            })
742        })
743    }
744
745    #[expect(clippy::wrong_self_convention, reason = "pymethods")]
746    fn into_sink<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
747        let realized = with_tokio_runtime(|| Connection::AsClient(self.underlying.connect()));
748
749        interruptible_future_to_py(py, async move {
750            Ok(PythonSink {
751                underlying: Arc::new(RwLock::new(
752                    realized.connect::<ConnectedDirect>().await.into_sink(),
753                )),
754            })
755        })
756    }
757}
758
759#[pyclass]
760#[derive(Clone)]
761struct PythonSink {
762    underlying: Arc<RwLock<DynSink<Bytes>>>,
763}
764
765#[pymethods]
766impl PythonSink {
767    fn send<'p>(&self, data: Py<PyBytes>, py: Python<'p>) -> PyResult<&'p PyAny> {
768        let underlying = self.underlying.clone();
769        let bytes = Bytes::from(data.as_bytes(py).to_vec());
770        interruptible_future_to_py(py, async move {
771            underlying.write().await.send(bytes).await?;
772            Ok(())
773        })
774    }
775}
776
777#[pyclass]
778#[derive(Clone)]
779struct PythonStream {
780    underlying: Arc<RwLock<DynStream>>,
781}
782
783#[pymethods]
784impl PythonStream {
785    fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
786        slf
787    }
788
789    fn __anext__<'p>(&self, py: Python<'p>) -> Option<&'p PyAny> {
790        let underlying = self.underlying.clone();
791        Some(
792            interruptible_future_to_py(py, async move {
793                let read_res = underlying.write().await.next().await;
794                read_res
795                    .and_then(|b| b.ok().map(|b| b.to_vec()))
796                    .map(Ok)
797                    .unwrap_or(Err(PyStopAsyncIteration::new_err(())))
798            })
799            .unwrap(),
800        )
801    }
802}
803
804#[pymodule]
805pub fn _core(py: Python<'_>, module: &PyModule) -> PyResult<()> {
806    unsafe {
807        pyo3::ffi::PyEval_InitThreads();
808    }
809
810    CONVERTERS_MODULE
811        .set(
812            PyModule::from_code(
813                py,
814                "
815import asyncio
816async def coroutine_to_safely_cancellable(c, cancel_token):
817    try:
818        return await asyncio.shield(c)
819    except asyncio.CancelledError:
820        cancel_token.safe_cancel()
821        await c
822        raise asyncio.CancelledError()
823",
824                "converters",
825                "converters",
826            )?
827            .into(),
828        )
829        .unwrap();
830
831    *TOKIO_RUNTIME.write().unwrap() = Some(tokio::runtime::Runtime::new().unwrap());
832    let atexit = PyModule::import(py, "atexit")?;
833    atexit.call_method1("register", (wrap_pyfunction!(cleanup_runtime, module)?,))?;
834
835    module.add("AnyhowError", py.get_type::<AnyhowError>())?;
836    module.add_class::<AnyhowWrapper>()?;
837
838    module.add_class::<HydroflowSink>()?;
839    module.add_class::<Deployment>()?;
840
841    module.add_class::<Host>()?;
842    module.add_class::<LocalhostHost>()?;
843
844    module.add_class::<GcpNetwork>()?;
845    module.add_class::<GcpComputeEngineHost>()?;
846
847    module.add_class::<Service>()?;
848    module.add_class::<CustomService>()?;
849    module.add_class::<CustomClientPort>()?;
850    module.add_class::<HydroflowCrate>()?;
851    module.add_class::<HydroflowCratePort>()?;
852
853    module.add_class::<ServerPort>()?;
854    module.add_class::<PythonSink>()?;
855    module.add_class::<PythonStream>()?;
856
857    module.add_function(wrap_pyfunction!(demux, module)?)?;
858    module.add_function(wrap_pyfunction!(null, module)?)?;
859
860    module.add_wrapped(wrap_pymodule!(cli::cli))?;
861
862    Ok(())
863}