1#![expect(
2 unused_qualifications,
3 non_local_definitions,
4 unsafe_op_in_unsafe_fn,
5 reason = "for pyo3 generated code"
6)]
7
8use core::rust_crate::ports::RustCrateSource;
9use std::cell::OnceCell;
10use std::collections::HashMap;
11use std::ops::Deref;
12use std::pin::Pin;
13use std::sync::{Arc, OnceLock};
14
15use bytes::Bytes;
16use futures::{Future, SinkExt, StreamExt};
17use hydro_deploy_integration::{
18 ConnectedDirect, ConnectedSink, ConnectedSource, Connection, DynSink, DynStream,
19};
20use pyo3::exceptions::{PyException, PyStopAsyncIteration};
21use pyo3::prelude::*;
22use pyo3::types::{PyBytes, PyDict};
23use pyo3::{create_exception, wrap_pymodule};
24use pyo3_asyncio::TaskLocals;
25use pythonize::pythonize;
26use tokio::sync::oneshot::Sender;
27use tokio::sync::{Mutex, RwLock};
28
29mod cli;
30use hydro_deploy::ssh::LaunchedSshHost;
31use hydro_deploy::{self as core};
32
33static TOKIO_RUNTIME: std::sync::RwLock<Option<tokio::runtime::Runtime>> =
34 std::sync::RwLock::new(None);
35
36#[pyfunction]
37fn cleanup_runtime() {
38 drop(TOKIO_RUNTIME.write().unwrap().take());
39}
40
41struct TokioRuntime {}
42
43impl pyo3_asyncio::generic::Runtime for TokioRuntime {
44 type JoinError = tokio::task::JoinError;
45 type JoinHandle = tokio::task::JoinHandle<()>;
46
47 fn spawn<F>(fut: F) -> Self::JoinHandle
48 where
49 F: Future<Output = ()> + Send + 'static,
50 {
51 TOKIO_RUNTIME
52 .read()
53 .unwrap()
54 .as_ref()
55 .unwrap()
56 .spawn(async move {
57 fut.await;
58 })
59 }
60}
61
62tokio::task_local! {
63 static TASK_LOCALS: OnceCell<TaskLocals>;
64}
65
66impl pyo3_asyncio::generic::ContextExt for TokioRuntime {
67 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
68 where
69 F: Future<Output = R> + Send + 'static,
70 {
71 let cell = OnceCell::new();
72 cell.set(locals).unwrap();
73
74 Box::pin(TASK_LOCALS.scope(cell, fut))
75 }
76
77 fn get_task_locals() -> Option<TaskLocals> {
78 TASK_LOCALS
79 .try_with(|c| c.get().cloned())
80 .unwrap_or_default()
81 }
82}
83
84create_exception!(hydro_cli_core, AnyhowError, PyException);
85
86#[pyclass]
87struct SafeCancelToken {
88 cancel_tx: Option<Sender<()>>,
89}
90
91#[pymethods]
92impl SafeCancelToken {
93 fn safe_cancel(&mut self) {
94 if let Some(token) = self.cancel_tx.take() {
95 let _ = token.send(());
96 }
97 }
98}
99
100static CONVERTERS_MODULE: OnceLock<Py<PyModule>> = OnceLock::new();
101
102fn interruptible_future_to_py<F, T>(py: Python<'_>, fut: F) -> PyResult<&PyAny>
103where
104 F: Future<Output = PyResult<T>> + Send + 'static,
105 T: IntoPy<PyObject>,
106{
107 let module = CONVERTERS_MODULE.get().unwrap().clone().into_ref(py);
108
109 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
110
111 let base_coro = pyo3_asyncio::generic::future_into_py::<TokioRuntime, _, _>(py, async move {
112 tokio::select! {
113 biased;
114 _ = cancel_rx => Ok(None),
115 r = fut => r.map(|o| Some(o))
116 }
117 })?;
118
119 module.call_method1(
120 "coroutine_to_safely_cancellable",
121 (
122 base_coro,
123 SafeCancelToken {
124 cancel_tx: Some(cancel_tx),
125 },
126 ),
127 )
128}
129
130#[pyclass]
131#[derive(Clone)]
132pub struct AnyhowWrapper {
133 pub underlying: Arc<RwLock<Option<anyhow::Error>>>,
134}
135
136#[pymethods]
137impl AnyhowWrapper {
138 fn __str__(&self) -> PyResult<String> {
139 Ok(format!(
140 "{:?}",
141 self.underlying.try_read().unwrap().as_ref().unwrap()
142 ))
143 }
144}
145
146#[pyclass(subclass)]
147#[derive(Clone)]
148struct HydroflowSink {
149 underlying: Arc<dyn core::rust_crate::ports::RustCrateSink>,
150}
151
152#[pyclass(name = "Deployment")]
153pub struct Deployment {
154 underlying: Arc<RwLock<core::Deployment>>,
155}
156
157#[pymethods]
158impl Deployment {
159 #[new]
160 fn new() -> Self {
161 Deployment {
162 underlying: Arc::new(RwLock::new(core::Deployment::new())),
163 }
164 }
165
166 #[expect(non_snake_case, reason = "pymethods")]
167 fn Localhost(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
168 let arc = self.underlying.blocking_read().Localhost();
169
170 Ok(Py::new(
171 py,
172 PyClassInitializer::from(Host {
173 underlying: arc.clone(),
174 })
175 .add_subclass(LocalhostHost { underlying: arc }),
176 )?
177 .into_py(py))
178 }
179
180 #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
181 fn GcpComputeEngineHost(
182 &self,
183 py: Python<'_>,
184 project: String,
185 machine_type: String,
186 image: String,
187 region: String,
188 network: GcpNetwork,
189 user: Option<String>,
190 ) -> PyResult<Py<PyAny>> {
191 let arc = self.underlying.blocking_write().add_host(|id| {
192 core::GcpComputeEngineHost::new(
193 id,
194 project,
195 machine_type,
196 image,
197 region,
198 network.underlying,
199 user,
200 )
201 });
202
203 Ok(Py::new(
204 py,
205 PyClassInitializer::from(Host {
206 underlying: arc.clone(),
207 })
208 .add_subclass(GcpComputeEngineHost { underlying: arc }),
209 )?
210 .into_py(py))
211 }
212
213 #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
214 fn AzureHost(
215 &self,
216 py: Python<'_>,
217 project: String,
218 os_type: String, machine_size: String,
220 region: String,
221 image: Option<HashMap<String, String>>,
222 user: Option<String>,
223 ) -> PyResult<Py<PyAny>> {
224 let arc = self.underlying.blocking_write().add_host(|id| {
225 core::AzureHost::new(id, project, os_type, machine_size, image, region, user)
226 });
227
228 Ok(Py::new(
229 py,
230 PyClassInitializer::from(Host {
231 underlying: arc.clone(),
232 })
233 .add_subclass(AzureHost { underlying: arc }),
234 )?
235 .into_py(py))
236 }
237
238 #[expect(non_snake_case, reason = "pymethods")]
239 fn CustomService(
240 &self,
241 py: Python<'_>,
242 on: &Host,
243 external_ports: Vec<u16>,
244 ) -> PyResult<Py<PyAny>> {
245 let service = self
246 .underlying
247 .blocking_write()
248 .CustomService(on.underlying.clone(), external_ports);
249
250 Ok(Py::new(
251 py,
252 PyClassInitializer::from(Service {
253 underlying: service.clone(),
254 })
255 .add_subclass(CustomService {
256 underlying: service,
257 }),
258 )?
259 .into_py(py))
260 }
261
262 #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
263 fn HydroflowCrate(
264 &self,
265 py: Python<'_>,
266 src: String,
267 on: &Host,
268 bin: Option<String>,
269 example: Option<String>,
270 profile: Option<String>,
271 features: Option<Vec<String>>,
272 args: Option<Vec<String>>,
273 display_id: Option<String>,
274 external_ports: Option<Vec<u16>>,
275 ) -> PyResult<Py<PyAny>> {
276 let service = self.underlying.blocking_write().add_service(|id| {
277 core::rust_crate::RustCrateService::new(
278 id,
279 src.into(),
280 on.underlying.clone(),
281 bin,
282 example,
283 profile,
284 None, None, false, None, features,
289 args,
290 display_id,
291 external_ports.unwrap_or_default(),
292 )
293 });
294
295 Ok(Py::new(
296 py,
297 PyClassInitializer::from(Service {
298 underlying: service.clone(),
299 })
300 .add_subclass(HydroflowCrate {
301 underlying: service,
302 }),
303 )?
304 .into_py(py))
305 }
306
307 fn deploy<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
308 let underlying = self.underlying.clone();
309 let py_none = py.None();
310 interruptible_future_to_py(py, async move {
311 underlying.write().await.deploy().await.map_err(|e| {
312 AnyhowError::new_err(AnyhowWrapper {
313 underlying: Arc::new(RwLock::new(Some(e))),
314 })
315 })?;
316 Ok(py_none)
317 })
318 }
319
320 fn start<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
321 let underlying = self.underlying.clone();
322 let py_none = py.None();
323 interruptible_future_to_py(py, async move {
324 underlying.write().await.start().await.map_err(|e| {
325 AnyhowError::new_err(AnyhowWrapper {
326 underlying: Arc::new(RwLock::new(Some(e))),
327 })
328 })?;
329 Ok(py_none)
330 })
331 }
332}
333
334#[pyclass(subclass)]
335pub struct Host {
336 underlying: Arc<dyn core::Host>,
337}
338
339#[pyclass(extends=Host, subclass)]
340struct LocalhostHost {
341 underlying: Arc<core::LocalhostHost>,
342}
343
344#[pymethods]
345impl LocalhostHost {
346 fn client_only(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
347 let arc = Arc::new(self.underlying.client_only());
348
349 Ok(Py::new(
350 py,
351 PyClassInitializer::from(Host {
352 underlying: arc.clone(),
353 })
354 .add_subclass(LocalhostHost { underlying: arc }),
355 )?
356 .into_py(py))
357 }
358}
359
360#[pyclass]
361#[derive(Clone)]
362struct GcpNetwork {
363 underlying: Arc<RwLock<core::gcp::GcpNetwork>>,
364}
365
366#[pymethods]
367impl GcpNetwork {
368 #[new]
369 fn new(project: String, existing: Option<String>) -> Self {
370 GcpNetwork {
371 underlying: Arc::new(RwLock::new(core::gcp::GcpNetwork::new(project, existing))),
372 }
373 }
374}
375
376#[pyclass(extends=Host, subclass)]
377struct GcpComputeEngineHost {
378 underlying: Arc<core::GcpComputeEngineHost>,
379}
380
381#[pymethods]
382impl GcpComputeEngineHost {
383 #[getter]
384 fn internal_ip(&self) -> String {
385 self.underlying.launched.get().unwrap().internal_ip.clone()
386 }
387
388 #[getter]
389 fn external_ip(&self) -> Option<String> {
390 self.underlying.launched.get().unwrap().external_ip.clone()
391 }
392
393 #[getter]
394 fn ssh_key_path(&self) -> String {
395 self.underlying
396 .launched
397 .get()
398 .unwrap()
399 .ssh_key_path()
400 .to_str()
401 .unwrap()
402 .to_string()
403 }
404}
405
406#[pyclass(extends=Host, subclass)]
407struct AzureHost {
408 underlying: Arc<core::AzureHost>,
409}
410
411#[pymethods]
412impl AzureHost {
413 #[getter]
414 fn internal_ip(&self) -> String {
415 self.underlying.launched.get().unwrap().internal_ip.clone()
416 }
417
418 #[getter]
419 fn external_ip(&self) -> Option<String> {
420 self.underlying.launched.get().unwrap().external_ip.clone()
421 }
422
423 #[getter]
424 fn ssh_key_path(&self) -> String {
425 self.underlying
426 .launched
427 .get()
428 .unwrap()
429 .ssh_key_path()
430 .to_str()
431 .unwrap()
432 .to_string()
433 }
434}
435
436#[pyclass(subclass)]
437pub struct Service {
438 underlying: Arc<RwLock<dyn core::Service>>,
439}
440
441#[pymethods]
442impl Service {
443 fn stop<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
444 let underlying = self.underlying.clone();
445 let py_none = py.None();
446 interruptible_future_to_py(py, async move {
447 underlying.write().await.stop().await.unwrap();
448 Ok(py_none)
449 })
450 }
451}
452
453#[pyclass]
454struct PyReceiver {
455 receiver: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<String>>>,
456}
457
458#[pymethods]
459impl PyReceiver {
460 fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
461 slf
462 }
463
464 fn __anext__<'p>(&self, py: Python<'p>) -> Option<&'p PyAny> {
465 let receiver = self.receiver.clone();
466 Some(
467 interruptible_future_to_py(py, async move {
468 receiver
469 .lock()
470 .await
471 .recv()
472 .await
473 .ok_or_else(|| PyStopAsyncIteration::new_err(()))
474 })
475 .unwrap(),
476 )
477 }
478}
479
480#[pyclass(extends=Service, subclass)]
481struct CustomService {
482 underlying: Arc<RwLock<core::CustomService>>,
483}
484
485#[pymethods]
486impl CustomService {
487 fn client_port(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
488 let arc = Arc::new(core::custom_service::CustomClientPort::new(Arc::downgrade(
489 &self.underlying,
490 )));
491
492 Ok(Py::new(
493 py,
494 PyClassInitializer::from(HydroflowSink {
495 underlying: arc.clone(),
496 })
497 .add_subclass(CustomClientPort { underlying: arc }),
498 )?
499 .into_py(py))
500 }
501}
502
503#[pyclass(extends=HydroflowSink, subclass)]
504#[derive(Clone)]
505struct CustomClientPort {
506 underlying: Arc<core::custom_service::CustomClientPort>,
507}
508
509#[pymethods]
510impl CustomClientPort {
511 fn send_to(&self, to: &HydroflowSink) {
512 self.underlying.send_to(to.underlying.deref());
513 }
514
515 fn tagged(&self, tag: u32) -> TaggedSource {
516 TaggedSource {
517 underlying: Arc::new(core::rust_crate::ports::TaggedSource {
518 source: self.underlying.clone(),
519 tag,
520 }),
521 }
522 }
523
524 fn server_port<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
525 let underlying = self.underlying.clone();
526 interruptible_future_to_py(py, async move {
527 Ok(ServerPort {
528 underlying: underlying.server_port().await,
529 })
530 })
531 }
532}
533
534#[pyclass(extends=Service, subclass)]
535struct HydroflowCrate {
536 underlying: Arc<RwLock<core::rust_crate::RustCrateService>>,
537}
538
539#[pymethods]
540impl HydroflowCrate {
541 fn stdout<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
542 let underlying = self.underlying.clone();
543 interruptible_future_to_py(py, async move {
544 let underlying = underlying.read().await;
545 Ok(PyReceiver {
546 receiver: Arc::new(Mutex::new(underlying.stdout())),
547 })
548 })
549 }
550
551 fn stderr<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
552 let underlying = self.underlying.clone();
553 interruptible_future_to_py(py, async move {
554 let underlying = underlying.read().await;
555 Ok(PyReceiver {
556 receiver: Arc::new(Mutex::new(underlying.stderr())),
557 })
558 })
559 }
560
561 fn exit_code<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
562 let underlying = self.underlying.clone();
563 interruptible_future_to_py(py, async move {
564 let underlying = underlying.read().await;
565 Ok(underlying.exit_code())
566 })
567 }
568
569 #[getter]
570 fn ports(&self) -> HydroflowCratePorts {
571 HydroflowCratePorts {
572 underlying: self.underlying.clone(),
573 }
574 }
575}
576
577#[pyclass]
578#[derive(Clone)]
579struct HydroflowCratePorts {
580 underlying: Arc<RwLock<core::rust_crate::RustCrateService>>,
581}
582
583#[pymethods]
584impl HydroflowCratePorts {
585 fn __getattribute__(&self, name: String, py: Python<'_>) -> PyResult<Py<PyAny>> {
586 let arc = Arc::new(
587 self.underlying
588 .try_read()
589 .unwrap()
590 .get_port(name, &self.underlying),
591 );
592
593 Ok(Py::new(
594 py,
595 PyClassInitializer::from(HydroflowSink {
596 underlying: arc.clone(),
597 })
598 .add_subclass(HydroflowCratePort { underlying: arc }),
599 )?
600 .into_py(py))
601 }
602}
603
604#[pyclass(extends=HydroflowSink, subclass)]
605#[derive(Clone)]
606struct HydroflowCratePort {
607 underlying: Arc<core::rust_crate::ports::RustCratePortConfig>,
608}
609
610#[pymethods]
611impl HydroflowCratePort {
612 fn merge(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
613 let arc = Arc::new(self.underlying.clone().merge());
614
615 Ok(Py::new(
616 py,
617 PyClassInitializer::from(HydroflowSink {
618 underlying: arc.clone(),
619 })
620 .add_subclass(HydroflowCratePort { underlying: arc }),
621 )?
622 .into_py(py))
623 }
624
625 fn send_to(&self, to: &HydroflowSink) {
626 self.underlying.send_to(to.underlying.deref());
627 }
628
629 fn tagged(&self, tag: u32) -> TaggedSource {
630 TaggedSource {
631 underlying: Arc::new(core::rust_crate::ports::TaggedSource {
632 source: self.underlying.clone(),
633 tag,
634 }),
635 }
636 }
637}
638
639#[pyfunction]
640fn demux(mapping: &PyDict) -> HydroflowSink {
641 HydroflowSink {
642 underlying: Arc::new(core::rust_crate::ports::DemuxSink {
643 demux: mapping
644 .into_iter()
645 .map(|(k, v)| {
646 let k = k.extract::<u32>().unwrap();
647 let v = v.extract::<HydroflowSink>().unwrap();
648 (k, v.underlying)
649 })
650 .collect(),
651 }),
652 }
653}
654
655#[pyclass(subclass)]
656#[derive(Clone)]
657struct TaggedSource {
658 underlying: Arc<core::rust_crate::ports::TaggedSource>,
659}
660
661#[pymethods]
662impl TaggedSource {
663 fn send_to(&self, to: &HydroflowSink) {
664 self.underlying.send_to(to.underlying.deref());
665 }
666
667 fn tagged(&self, tag: u32) -> TaggedSource {
668 TaggedSource {
669 underlying: Arc::new(core::rust_crate::ports::TaggedSource {
670 source: self.underlying.clone(),
671 tag,
672 }),
673 }
674 }
675}
676
677#[pyclass(extends=HydroflowSink, subclass)]
678#[derive(Clone)]
679struct HydroflowNull {
680 underlying: Arc<core::rust_crate::ports::NullSourceSink>,
681}
682
683#[pymethods]
684impl HydroflowNull {
685 fn send_to(&self, to: &HydroflowSink) {
686 self.underlying.send_to(to.underlying.deref());
687 }
688
689 fn tagged(&self, tag: u32) -> TaggedSource {
690 TaggedSource {
691 underlying: Arc::new(core::rust_crate::ports::TaggedSource {
692 source: self.underlying.clone(),
693 tag,
694 }),
695 }
696 }
697}
698
699#[pyfunction]
700fn null(py: Python<'_>) -> PyResult<Py<PyAny>> {
701 let arc = Arc::new(core::rust_crate::ports::NullSourceSink);
702
703 Ok(Py::new(
704 py,
705 PyClassInitializer::from(HydroflowSink {
706 underlying: arc.clone(),
707 })
708 .add_subclass(HydroflowNull { underlying: arc }),
709 )?
710 .into_py(py))
711}
712
713#[pyclass]
714struct ServerPort {
715 underlying: hydro_deploy_integration::ServerPort,
716}
717
718fn with_tokio_runtime<T>(f: impl Fn() -> T) -> T {
719 let runtime_read = TOKIO_RUNTIME.read().unwrap();
720 let _guard = runtime_read.as_ref().unwrap().enter();
721 f()
722}
723
724#[pymethods]
725impl ServerPort {
726 fn json(&self, py: Python<'_>) -> Py<PyAny> {
727 pythonize(py, &self.underlying).unwrap()
728 }
729
730 #[expect(clippy::wrong_self_convention, reason = "pymethods")]
731 fn into_source<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
732 let realized = with_tokio_runtime(|| Connection::AsClient(self.underlying.connect()));
733
734 interruptible_future_to_py(py, async move {
735 Ok(PythonStream {
736 underlying: Arc::new(RwLock::new(
737 realized.connect::<ConnectedDirect>().await.into_source(),
738 )),
739 })
740 })
741 }
742
743 #[expect(clippy::wrong_self_convention, reason = "pymethods")]
744 fn into_sink<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
745 let realized = with_tokio_runtime(|| Connection::AsClient(self.underlying.connect()));
746
747 interruptible_future_to_py(py, async move {
748 Ok(PythonSink {
749 underlying: Arc::new(RwLock::new(
750 realized.connect::<ConnectedDirect>().await.into_sink(),
751 )),
752 })
753 })
754 }
755}
756
757#[pyclass]
758#[derive(Clone)]
759struct PythonSink {
760 underlying: Arc<RwLock<DynSink<Bytes>>>,
761}
762
763#[pymethods]
764impl PythonSink {
765 fn send<'p>(&self, data: Py<PyBytes>, py: Python<'p>) -> PyResult<&'p PyAny> {
766 let underlying = self.underlying.clone();
767 let bytes = Bytes::from(data.as_bytes(py).to_vec());
768 interruptible_future_to_py(py, async move {
769 underlying.write().await.send(bytes).await?;
770 Ok(())
771 })
772 }
773}
774
775#[pyclass]
776#[derive(Clone)]
777struct PythonStream {
778 underlying: Arc<RwLock<DynStream>>,
779}
780
781#[pymethods]
782impl PythonStream {
783 fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
784 slf
785 }
786
787 fn __anext__<'p>(&self, py: Python<'p>) -> Option<&'p PyAny> {
788 let underlying = self.underlying.clone();
789 Some(
790 interruptible_future_to_py(py, async move {
791 let read_res = underlying.write().await.next().await;
792 read_res
793 .and_then(|b| b.ok().map(|b| b.to_vec()))
794 .map(Ok)
795 .unwrap_or(Err(PyStopAsyncIteration::new_err(())))
796 })
797 .unwrap(),
798 )
799 }
800}
801
802#[pymodule]
803pub fn _core(py: Python<'_>, module: &PyModule) -> PyResult<()> {
804 unsafe {
805 pyo3::ffi::PyEval_InitThreads();
806 }
807
808 CONVERTERS_MODULE
809 .set(
810 PyModule::from_code(
811 py,
812 "
813import asyncio
814async def coroutine_to_safely_cancellable(c, cancel_token):
815 try:
816 return await asyncio.shield(c)
817 except asyncio.CancelledError:
818 cancel_token.safe_cancel()
819 await c
820 raise asyncio.CancelledError()
821",
822 "converters",
823 "converters",
824 )?
825 .into(),
826 )
827 .unwrap();
828
829 *TOKIO_RUNTIME.write().unwrap() = Some(tokio::runtime::Runtime::new().unwrap());
830 let atexit = PyModule::import(py, "atexit")?;
831 atexit.call_method1("register", (wrap_pyfunction!(cleanup_runtime, module)?,))?;
832
833 module.add("AnyhowError", py.get_type::<AnyhowError>())?;
834 module.add_class::<AnyhowWrapper>()?;
835
836 module.add_class::<HydroflowSink>()?;
837 module.add_class::<Deployment>()?;
838
839 module.add_class::<Host>()?;
840 module.add_class::<LocalhostHost>()?;
841
842 module.add_class::<GcpNetwork>()?;
843 module.add_class::<GcpComputeEngineHost>()?;
844
845 module.add_class::<Service>()?;
846 module.add_class::<CustomService>()?;
847 module.add_class::<CustomClientPort>()?;
848 module.add_class::<HydroflowCrate>()?;
849 module.add_class::<HydroflowCratePort>()?;
850
851 module.add_class::<ServerPort>()?;
852 module.add_class::<PythonSink>()?;
853 module.add_class::<PythonStream>()?;
854
855 module.add_function(wrap_pyfunction!(demux, module)?)?;
856 module.add_function(wrap_pyfunction!(null, module)?)?;
857
858 module.add_wrapped(wrap_pymodule!(cli::cli))?;
859
860 Ok(())
861}