hydro_cli/
lib.rs

1#![expect(
2    unused_qualifications,
3    non_local_definitions,
4    unsafe_op_in_unsafe_fn,
5    reason = "for pyo3 generated code"
6)]
7
8use core::rust_crate::ports::RustCrateSource;
9use std::cell::OnceCell;
10use std::collections::HashMap;
11use std::ops::Deref;
12use std::pin::Pin;
13use std::sync::{Arc, OnceLock};
14
15use bytes::Bytes;
16use futures::{Future, SinkExt, StreamExt};
17use hydro_deploy_integration::{
18    ConnectedDirect, ConnectedSink, ConnectedSource, Connection, DynSink, DynStream,
19};
20use pyo3::exceptions::{PyException, PyStopAsyncIteration};
21use pyo3::prelude::*;
22use pyo3::types::{PyBytes, PyDict};
23use pyo3::{create_exception, wrap_pymodule};
24use pyo3_asyncio::TaskLocals;
25use pythonize::pythonize;
26use tokio::sync::oneshot::Sender;
27use tokio::sync::{Mutex, RwLock};
28
29mod cli;
30use hydro_deploy::ssh::LaunchedSshHost;
31use hydro_deploy::{self as core};
32
33static TOKIO_RUNTIME: std::sync::RwLock<Option<tokio::runtime::Runtime>> =
34    std::sync::RwLock::new(None);
35
36#[pyfunction]
37fn cleanup_runtime() {
38    drop(TOKIO_RUNTIME.write().unwrap().take());
39}
40
41struct TokioRuntime {}
42
43impl pyo3_asyncio::generic::Runtime for TokioRuntime {
44    type JoinError = tokio::task::JoinError;
45    type JoinHandle = tokio::task::JoinHandle<()>;
46
47    fn spawn<F>(fut: F) -> Self::JoinHandle
48    where
49        F: Future<Output = ()> + Send + 'static,
50    {
51        TOKIO_RUNTIME
52            .read()
53            .unwrap()
54            .as_ref()
55            .unwrap()
56            .spawn(async move {
57                fut.await;
58            })
59    }
60}
61
62tokio::task_local! {
63    static TASK_LOCALS: OnceCell<TaskLocals>;
64}
65
66impl pyo3_asyncio::generic::ContextExt for TokioRuntime {
67    fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
68    where
69        F: Future<Output = R> + Send + 'static,
70    {
71        let cell = OnceCell::new();
72        cell.set(locals).unwrap();
73
74        Box::pin(TASK_LOCALS.scope(cell, fut))
75    }
76
77    fn get_task_locals() -> Option<TaskLocals> {
78        TASK_LOCALS
79            .try_with(|c| c.get().cloned())
80            .unwrap_or_default()
81    }
82}
83
84create_exception!(hydro_cli_core, AnyhowError, PyException);
85
86#[pyclass]
87struct SafeCancelToken {
88    cancel_tx: Option<Sender<()>>,
89}
90
91#[pymethods]
92impl SafeCancelToken {
93    fn safe_cancel(&mut self) {
94        if let Some(token) = self.cancel_tx.take() {
95            let _ = token.send(());
96        }
97    }
98}
99
100static CONVERTERS_MODULE: OnceLock<Py<PyModule>> = OnceLock::new();
101
102fn interruptible_future_to_py<F, T>(py: Python<'_>, fut: F) -> PyResult<&PyAny>
103where
104    F: Future<Output = PyResult<T>> + Send + 'static,
105    T: IntoPy<PyObject>,
106{
107    let module = CONVERTERS_MODULE.get().unwrap().clone().into_ref(py);
108
109    let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
110
111    let base_coro = pyo3_asyncio::generic::future_into_py::<TokioRuntime, _, _>(py, async move {
112        tokio::select! {
113            biased;
114            _ = cancel_rx => Ok(None),
115            r = fut => r.map(|o| Some(o))
116        }
117    })?;
118
119    module.call_method1(
120        "coroutine_to_safely_cancellable",
121        (
122            base_coro,
123            SafeCancelToken {
124                cancel_tx: Some(cancel_tx),
125            },
126        ),
127    )
128}
129
130#[pyclass]
131#[derive(Clone)]
132pub struct AnyhowWrapper {
133    pub underlying: Arc<RwLock<Option<anyhow::Error>>>,
134}
135
136#[pymethods]
137impl AnyhowWrapper {
138    fn __str__(&self) -> PyResult<String> {
139        Ok(format!(
140            "{:?}",
141            self.underlying.try_read().unwrap().as_ref().unwrap()
142        ))
143    }
144}
145
146#[pyclass(subclass)]
147#[derive(Clone)]
148struct HydroflowSink {
149    underlying: Arc<dyn core::rust_crate::ports::RustCrateSink>,
150}
151
152#[pyclass(name = "Deployment")]
153pub struct Deployment {
154    underlying: Arc<RwLock<core::Deployment>>,
155}
156
157#[pymethods]
158impl Deployment {
159    #[new]
160    fn new() -> Self {
161        Deployment {
162            underlying: Arc::new(RwLock::new(core::Deployment::new())),
163        }
164    }
165
166    #[expect(non_snake_case, reason = "pymethods")]
167    fn Localhost(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
168        let arc = self.underlying.blocking_read().Localhost();
169
170        Ok(Py::new(
171            py,
172            PyClassInitializer::from(Host {
173                underlying: arc.clone(),
174            })
175            .add_subclass(LocalhostHost { underlying: arc }),
176        )?
177        .into_py(py))
178    }
179
180    #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
181    fn GcpComputeEngineHost(
182        &self,
183        py: Python<'_>,
184        project: String,
185        machine_type: String,
186        image: String,
187        region: String,
188        network: GcpNetwork,
189        user: Option<String>,
190    ) -> PyResult<Py<PyAny>> {
191        let arc = self.underlying.blocking_write().add_host(|id| {
192            core::GcpComputeEngineHost::new(
193                id,
194                project,
195                machine_type,
196                image,
197                region,
198                network.underlying,
199                user,
200            )
201        });
202
203        Ok(Py::new(
204            py,
205            PyClassInitializer::from(Host {
206                underlying: arc.clone(),
207            })
208            .add_subclass(GcpComputeEngineHost { underlying: arc }),
209        )?
210        .into_py(py))
211    }
212
213    #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
214    fn AzureHost(
215        &self,
216        py: Python<'_>,
217        project: String,
218        os_type: String, // linux or windows
219        machine_size: String,
220        region: String,
221        image: Option<HashMap<String, String>>,
222        user: Option<String>,
223    ) -> PyResult<Py<PyAny>> {
224        let arc = self.underlying.blocking_write().add_host(|id| {
225            core::AzureHost::new(id, project, os_type, machine_size, image, region, user)
226        });
227
228        Ok(Py::new(
229            py,
230            PyClassInitializer::from(Host {
231                underlying: arc.clone(),
232            })
233            .add_subclass(AzureHost { underlying: arc }),
234        )?
235        .into_py(py))
236    }
237
238    #[expect(non_snake_case, reason = "pymethods")]
239    fn CustomService(
240        &self,
241        py: Python<'_>,
242        on: &Host,
243        external_ports: Vec<u16>,
244    ) -> PyResult<Py<PyAny>> {
245        let service = self
246            .underlying
247            .blocking_write()
248            .CustomService(on.underlying.clone(), external_ports);
249
250        Ok(Py::new(
251            py,
252            PyClassInitializer::from(Service {
253                underlying: service.clone(),
254            })
255            .add_subclass(CustomService {
256                underlying: service,
257            }),
258        )?
259        .into_py(py))
260    }
261
262    #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
263    fn HydroflowCrate(
264        &self,
265        py: Python<'_>,
266        src: String,
267        on: &Host,
268        bin: Option<String>,
269        example: Option<String>,
270        profile: Option<String>,
271        features: Option<Vec<String>>,
272        args: Option<Vec<String>>,
273        display_id: Option<String>,
274        external_ports: Option<Vec<u16>>,
275    ) -> PyResult<Py<PyAny>> {
276        let service = self.underlying.blocking_write().add_service(|id| {
277            core::rust_crate::RustCrateService::new(
278                id,
279                src.into(),
280                on.underlying.clone(),
281                bin,
282                example,
283                profile,
284                None,  // Python API doesn't support rustflags
285                None,  // Python API doesn't support target_dir
286                false, // Python API doesn't support no_default_features
287                None,  // Python API doesn't support perf
288                features,
289                args,
290                display_id,
291                external_ports.unwrap_or_default(),
292            )
293        });
294
295        Ok(Py::new(
296            py,
297            PyClassInitializer::from(Service {
298                underlying: service.clone(),
299            })
300            .add_subclass(HydroflowCrate {
301                underlying: service,
302            }),
303        )?
304        .into_py(py))
305    }
306
307    fn deploy<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
308        let underlying = self.underlying.clone();
309        let py_none = py.None();
310        interruptible_future_to_py(py, async move {
311            underlying.write().await.deploy().await.map_err(|e| {
312                AnyhowError::new_err(AnyhowWrapper {
313                    underlying: Arc::new(RwLock::new(Some(e))),
314                })
315            })?;
316            Ok(py_none)
317        })
318    }
319
320    fn start<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
321        let underlying = self.underlying.clone();
322        let py_none = py.None();
323        interruptible_future_to_py(py, async move {
324            underlying.write().await.start().await.map_err(|e| {
325                AnyhowError::new_err(AnyhowWrapper {
326                    underlying: Arc::new(RwLock::new(Some(e))),
327                })
328            })?;
329            Ok(py_none)
330        })
331    }
332}
333
334#[pyclass(subclass)]
335pub struct Host {
336    underlying: Arc<dyn core::Host>,
337}
338
339#[pyclass(extends=Host, subclass)]
340struct LocalhostHost {
341    underlying: Arc<core::LocalhostHost>,
342}
343
344#[pymethods]
345impl LocalhostHost {
346    fn client_only(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
347        let arc = Arc::new(self.underlying.client_only());
348
349        Ok(Py::new(
350            py,
351            PyClassInitializer::from(Host {
352                underlying: arc.clone(),
353            })
354            .add_subclass(LocalhostHost { underlying: arc }),
355        )?
356        .into_py(py))
357    }
358}
359
360#[pyclass]
361#[derive(Clone)]
362struct GcpNetwork {
363    underlying: Arc<RwLock<core::gcp::GcpNetwork>>,
364}
365
366#[pymethods]
367impl GcpNetwork {
368    #[new]
369    fn new(project: String, existing: Option<String>) -> Self {
370        GcpNetwork {
371            underlying: Arc::new(RwLock::new(core::gcp::GcpNetwork::new(project, existing))),
372        }
373    }
374}
375
376#[pyclass(extends=Host, subclass)]
377struct GcpComputeEngineHost {
378    underlying: Arc<core::GcpComputeEngineHost>,
379}
380
381#[pymethods]
382impl GcpComputeEngineHost {
383    #[getter]
384    fn internal_ip(&self) -> String {
385        self.underlying.launched.get().unwrap().internal_ip.clone()
386    }
387
388    #[getter]
389    fn external_ip(&self) -> Option<String> {
390        self.underlying.launched.get().unwrap().external_ip.clone()
391    }
392
393    #[getter]
394    fn ssh_key_path(&self) -> String {
395        self.underlying
396            .launched
397            .get()
398            .unwrap()
399            .ssh_key_path()
400            .to_str()
401            .unwrap()
402            .to_string()
403    }
404}
405
406#[pyclass(extends=Host, subclass)]
407struct AzureHost {
408    underlying: Arc<core::AzureHost>,
409}
410
411#[pymethods]
412impl AzureHost {
413    #[getter]
414    fn internal_ip(&self) -> String {
415        self.underlying.launched.get().unwrap().internal_ip.clone()
416    }
417
418    #[getter]
419    fn external_ip(&self) -> Option<String> {
420        self.underlying.launched.get().unwrap().external_ip.clone()
421    }
422
423    #[getter]
424    fn ssh_key_path(&self) -> String {
425        self.underlying
426            .launched
427            .get()
428            .unwrap()
429            .ssh_key_path()
430            .to_str()
431            .unwrap()
432            .to_string()
433    }
434}
435
436#[pyclass(subclass)]
437pub struct Service {
438    underlying: Arc<RwLock<dyn core::Service>>,
439}
440
441#[pymethods]
442impl Service {
443    fn stop<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
444        let underlying = self.underlying.clone();
445        let py_none = py.None();
446        interruptible_future_to_py(py, async move {
447            underlying.write().await.stop().await.unwrap();
448            Ok(py_none)
449        })
450    }
451}
452
453#[pyclass]
454struct PyReceiver {
455    receiver: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<String>>>,
456}
457
458#[pymethods]
459impl PyReceiver {
460    fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
461        slf
462    }
463
464    fn __anext__<'p>(&self, py: Python<'p>) -> Option<&'p PyAny> {
465        let receiver = self.receiver.clone();
466        Some(
467            interruptible_future_to_py(py, async move {
468                receiver
469                    .lock()
470                    .await
471                    .recv()
472                    .await
473                    .ok_or_else(|| PyStopAsyncIteration::new_err(()))
474            })
475            .unwrap(),
476        )
477    }
478}
479
480#[pyclass(extends=Service, subclass)]
481struct CustomService {
482    underlying: Arc<RwLock<core::CustomService>>,
483}
484
485#[pymethods]
486impl CustomService {
487    fn client_port(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
488        let arc = Arc::new(core::custom_service::CustomClientPort::new(Arc::downgrade(
489            &self.underlying,
490        )));
491
492        Ok(Py::new(
493            py,
494            PyClassInitializer::from(HydroflowSink {
495                underlying: arc.clone(),
496            })
497            .add_subclass(CustomClientPort { underlying: arc }),
498        )?
499        .into_py(py))
500    }
501}
502
503#[pyclass(extends=HydroflowSink, subclass)]
504#[derive(Clone)]
505struct CustomClientPort {
506    underlying: Arc<core::custom_service::CustomClientPort>,
507}
508
509#[pymethods]
510impl CustomClientPort {
511    fn send_to(&self, to: &HydroflowSink) {
512        self.underlying.send_to(to.underlying.deref());
513    }
514
515    fn tagged(&self, tag: u32) -> TaggedSource {
516        TaggedSource {
517            underlying: Arc::new(core::rust_crate::ports::TaggedSource {
518                source: self.underlying.clone(),
519                tag,
520            }),
521        }
522    }
523
524    fn server_port<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
525        let underlying = self.underlying.clone();
526        interruptible_future_to_py(py, async move {
527            Ok(ServerPort {
528                underlying: underlying.server_port().await,
529            })
530        })
531    }
532}
533
534#[pyclass(extends=Service, subclass)]
535struct HydroflowCrate {
536    underlying: Arc<RwLock<core::rust_crate::RustCrateService>>,
537}
538
539#[pymethods]
540impl HydroflowCrate {
541    fn stdout<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
542        let underlying = self.underlying.clone();
543        interruptible_future_to_py(py, async move {
544            let underlying = underlying.read().await;
545            Ok(PyReceiver {
546                receiver: Arc::new(Mutex::new(underlying.stdout())),
547            })
548        })
549    }
550
551    fn stderr<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
552        let underlying = self.underlying.clone();
553        interruptible_future_to_py(py, async move {
554            let underlying = underlying.read().await;
555            Ok(PyReceiver {
556                receiver: Arc::new(Mutex::new(underlying.stderr())),
557            })
558        })
559    }
560
561    fn exit_code<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
562        let underlying = self.underlying.clone();
563        interruptible_future_to_py(py, async move {
564            let underlying = underlying.read().await;
565            Ok(underlying.exit_code())
566        })
567    }
568
569    #[getter]
570    fn ports(&self) -> HydroflowCratePorts {
571        HydroflowCratePorts {
572            underlying: self.underlying.clone(),
573        }
574    }
575}
576
577#[pyclass]
578#[derive(Clone)]
579struct HydroflowCratePorts {
580    underlying: Arc<RwLock<core::rust_crate::RustCrateService>>,
581}
582
583#[pymethods]
584impl HydroflowCratePorts {
585    fn __getattribute__(&self, name: String, py: Python<'_>) -> PyResult<Py<PyAny>> {
586        let arc = Arc::new(
587            self.underlying
588                .try_read()
589                .unwrap()
590                .get_port(name, &self.underlying),
591        );
592
593        Ok(Py::new(
594            py,
595            PyClassInitializer::from(HydroflowSink {
596                underlying: arc.clone(),
597            })
598            .add_subclass(HydroflowCratePort { underlying: arc }),
599        )?
600        .into_py(py))
601    }
602}
603
604#[pyclass(extends=HydroflowSink, subclass)]
605#[derive(Clone)]
606struct HydroflowCratePort {
607    underlying: Arc<core::rust_crate::ports::RustCratePortConfig>,
608}
609
610#[pymethods]
611impl HydroflowCratePort {
612    fn merge(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
613        let arc = Arc::new(self.underlying.clone().merge());
614
615        Ok(Py::new(
616            py,
617            PyClassInitializer::from(HydroflowSink {
618                underlying: arc.clone(),
619            })
620            .add_subclass(HydroflowCratePort { underlying: arc }),
621        )?
622        .into_py(py))
623    }
624
625    fn send_to(&self, to: &HydroflowSink) {
626        self.underlying.send_to(to.underlying.deref());
627    }
628
629    fn tagged(&self, tag: u32) -> TaggedSource {
630        TaggedSource {
631            underlying: Arc::new(core::rust_crate::ports::TaggedSource {
632                source: self.underlying.clone(),
633                tag,
634            }),
635        }
636    }
637}
638
639#[pyfunction]
640fn demux(mapping: &PyDict) -> HydroflowSink {
641    HydroflowSink {
642        underlying: Arc::new(core::rust_crate::ports::DemuxSink {
643            demux: mapping
644                .into_iter()
645                .map(|(k, v)| {
646                    let k = k.extract::<u32>().unwrap();
647                    let v = v.extract::<HydroflowSink>().unwrap();
648                    (k, v.underlying)
649                })
650                .collect(),
651        }),
652    }
653}
654
655#[pyclass(subclass)]
656#[derive(Clone)]
657struct TaggedSource {
658    underlying: Arc<core::rust_crate::ports::TaggedSource>,
659}
660
661#[pymethods]
662impl TaggedSource {
663    fn send_to(&self, to: &HydroflowSink) {
664        self.underlying.send_to(to.underlying.deref());
665    }
666
667    fn tagged(&self, tag: u32) -> TaggedSource {
668        TaggedSource {
669            underlying: Arc::new(core::rust_crate::ports::TaggedSource {
670                source: self.underlying.clone(),
671                tag,
672            }),
673        }
674    }
675}
676
677#[pyclass(extends=HydroflowSink, subclass)]
678#[derive(Clone)]
679struct HydroflowNull {
680    underlying: Arc<core::rust_crate::ports::NullSourceSink>,
681}
682
683#[pymethods]
684impl HydroflowNull {
685    fn send_to(&self, to: &HydroflowSink) {
686        self.underlying.send_to(to.underlying.deref());
687    }
688
689    fn tagged(&self, tag: u32) -> TaggedSource {
690        TaggedSource {
691            underlying: Arc::new(core::rust_crate::ports::TaggedSource {
692                source: self.underlying.clone(),
693                tag,
694            }),
695        }
696    }
697}
698
699#[pyfunction]
700fn null(py: Python<'_>) -> PyResult<Py<PyAny>> {
701    let arc = Arc::new(core::rust_crate::ports::NullSourceSink);
702
703    Ok(Py::new(
704        py,
705        PyClassInitializer::from(HydroflowSink {
706            underlying: arc.clone(),
707        })
708        .add_subclass(HydroflowNull { underlying: arc }),
709    )?
710    .into_py(py))
711}
712
713#[pyclass]
714struct ServerPort {
715    underlying: hydro_deploy_integration::ServerPort,
716}
717
718fn with_tokio_runtime<T>(f: impl Fn() -> T) -> T {
719    let runtime_read = TOKIO_RUNTIME.read().unwrap();
720    let _guard = runtime_read.as_ref().unwrap().enter();
721    f()
722}
723
724#[pymethods]
725impl ServerPort {
726    fn json(&self, py: Python<'_>) -> Py<PyAny> {
727        pythonize(py, &self.underlying).unwrap()
728    }
729
730    #[expect(clippy::wrong_self_convention, reason = "pymethods")]
731    fn into_source<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
732        let realized = with_tokio_runtime(|| Connection::AsClient(self.underlying.connect()));
733
734        interruptible_future_to_py(py, async move {
735            Ok(PythonStream {
736                underlying: Arc::new(RwLock::new(
737                    realized.connect::<ConnectedDirect>().await.into_source(),
738                )),
739            })
740        })
741    }
742
743    #[expect(clippy::wrong_self_convention, reason = "pymethods")]
744    fn into_sink<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
745        let realized = with_tokio_runtime(|| Connection::AsClient(self.underlying.connect()));
746
747        interruptible_future_to_py(py, async move {
748            Ok(PythonSink {
749                underlying: Arc::new(RwLock::new(
750                    realized.connect::<ConnectedDirect>().await.into_sink(),
751                )),
752            })
753        })
754    }
755}
756
757#[pyclass]
758#[derive(Clone)]
759struct PythonSink {
760    underlying: Arc<RwLock<DynSink<Bytes>>>,
761}
762
763#[pymethods]
764impl PythonSink {
765    fn send<'p>(&self, data: Py<PyBytes>, py: Python<'p>) -> PyResult<&'p PyAny> {
766        let underlying = self.underlying.clone();
767        let bytes = Bytes::from(data.as_bytes(py).to_vec());
768        interruptible_future_to_py(py, async move {
769            underlying.write().await.send(bytes).await?;
770            Ok(())
771        })
772    }
773}
774
775#[pyclass]
776#[derive(Clone)]
777struct PythonStream {
778    underlying: Arc<RwLock<DynStream>>,
779}
780
781#[pymethods]
782impl PythonStream {
783    fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
784        slf
785    }
786
787    fn __anext__<'p>(&self, py: Python<'p>) -> Option<&'p PyAny> {
788        let underlying = self.underlying.clone();
789        Some(
790            interruptible_future_to_py(py, async move {
791                let read_res = underlying.write().await.next().await;
792                read_res
793                    .and_then(|b| b.ok().map(|b| b.to_vec()))
794                    .map(Ok)
795                    .unwrap_or(Err(PyStopAsyncIteration::new_err(())))
796            })
797            .unwrap(),
798        )
799    }
800}
801
802#[pymodule]
803pub fn _core(py: Python<'_>, module: &PyModule) -> PyResult<()> {
804    unsafe {
805        pyo3::ffi::PyEval_InitThreads();
806    }
807
808    CONVERTERS_MODULE
809        .set(
810            PyModule::from_code(
811                py,
812                "
813import asyncio
814async def coroutine_to_safely_cancellable(c, cancel_token):
815    try:
816        return await asyncio.shield(c)
817    except asyncio.CancelledError:
818        cancel_token.safe_cancel()
819        await c
820        raise asyncio.CancelledError()
821",
822                "converters",
823                "converters",
824            )?
825            .into(),
826        )
827        .unwrap();
828
829    *TOKIO_RUNTIME.write().unwrap() = Some(tokio::runtime::Runtime::new().unwrap());
830    let atexit = PyModule::import(py, "atexit")?;
831    atexit.call_method1("register", (wrap_pyfunction!(cleanup_runtime, module)?,))?;
832
833    module.add("AnyhowError", py.get_type::<AnyhowError>())?;
834    module.add_class::<AnyhowWrapper>()?;
835
836    module.add_class::<HydroflowSink>()?;
837    module.add_class::<Deployment>()?;
838
839    module.add_class::<Host>()?;
840    module.add_class::<LocalhostHost>()?;
841
842    module.add_class::<GcpNetwork>()?;
843    module.add_class::<GcpComputeEngineHost>()?;
844
845    module.add_class::<Service>()?;
846    module.add_class::<CustomService>()?;
847    module.add_class::<CustomClientPort>()?;
848    module.add_class::<HydroflowCrate>()?;
849    module.add_class::<HydroflowCratePort>()?;
850
851    module.add_class::<ServerPort>()?;
852    module.add_class::<PythonSink>()?;
853    module.add_class::<PythonStream>()?;
854
855    module.add_function(wrap_pyfunction!(demux, module)?)?;
856    module.add_function(wrap_pyfunction!(null, module)?)?;
857
858    module.add_wrapped(wrap_pymodule!(cli::cli))?;
859
860    Ok(())
861}