1#![expect(
2 unused_qualifications,
3 non_local_definitions,
4 unsafe_op_in_unsafe_fn,
5 reason = "for pyo3 generated code"
6)]
7
8use core::rust_crate::ports::RustCrateSource;
9use std::cell::OnceCell;
10use std::collections::HashMap;
11use std::ops::Deref;
12use std::pin::Pin;
13use std::sync::{Arc, OnceLock};
14
15use bytes::Bytes;
16use futures::{Future, SinkExt, StreamExt};
17use hydro_deploy_integration::{
18 ConnectedDirect, ConnectedSink, ConnectedSource, Connection, DynSink, DynStream,
19};
20use pyo3::exceptions::{PyException, PyStopAsyncIteration};
21use pyo3::prelude::*;
22use pyo3::types::{PyBytes, PyDict};
23use pyo3::{create_exception, wrap_pymodule};
24use pyo3_asyncio::TaskLocals;
25use pythonize::pythonize;
26use tokio::sync::oneshot::Sender;
27use tokio::sync::{Mutex, RwLock};
28
29mod cli;
30use hydro_deploy::ssh::LaunchedSshHost;
31use hydro_deploy::{self as core};
32
33static TOKIO_RUNTIME: std::sync::RwLock<Option<tokio::runtime::Runtime>> =
34 std::sync::RwLock::new(None);
35
36#[pyfunction]
37fn cleanup_runtime() {
38 drop(TOKIO_RUNTIME.write().unwrap().take());
39}
40
41struct TokioRuntime {}
42
43impl pyo3_asyncio::generic::Runtime for TokioRuntime {
44 type JoinError = tokio::task::JoinError;
45 type JoinHandle = tokio::task::JoinHandle<()>;
46
47 fn spawn<F>(fut: F) -> Self::JoinHandle
48 where
49 F: Future<Output = ()> + Send + 'static,
50 {
51 TOKIO_RUNTIME
52 .read()
53 .unwrap()
54 .as_ref()
55 .unwrap()
56 .spawn(async move {
57 fut.await;
58 })
59 }
60}
61
62tokio::task_local! {
63 static TASK_LOCALS: OnceCell<TaskLocals>;
64}
65
66impl pyo3_asyncio::generic::ContextExt for TokioRuntime {
67 fn scope<F, R>(locals: TaskLocals, fut: F) -> Pin<Box<dyn Future<Output = R> + Send>>
68 where
69 F: Future<Output = R> + Send + 'static,
70 {
71 let cell = OnceCell::new();
72 cell.set(locals).unwrap();
73
74 Box::pin(TASK_LOCALS.scope(cell, fut))
75 }
76
77 fn get_task_locals() -> Option<TaskLocals> {
78 TASK_LOCALS
79 .try_with(|c| c.get().cloned())
80 .unwrap_or_default()
81 }
82}
83
84create_exception!(hydro_cli_core, AnyhowError, PyException);
85
86#[pyclass]
87struct SafeCancelToken {
88 cancel_tx: Option<Sender<()>>,
89}
90
91#[pymethods]
92impl SafeCancelToken {
93 fn safe_cancel(&mut self) {
94 if let Some(token) = self.cancel_tx.take() {
95 let _ = token.send(());
96 }
97 }
98}
99
100static CONVERTERS_MODULE: OnceLock<Py<PyModule>> = OnceLock::new();
101
102fn interruptible_future_to_py<F, T>(py: Python<'_>, fut: F) -> PyResult<&PyAny>
103where
104 F: Future<Output = PyResult<T>> + Send + 'static,
105 T: IntoPy<PyObject>,
106{
107 let module = CONVERTERS_MODULE.get().unwrap().clone().into_ref(py);
108
109 let (cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
110
111 let base_coro = pyo3_asyncio::generic::future_into_py::<TokioRuntime, _, _>(py, async move {
112 tokio::select! {
113 biased;
114 _ = cancel_rx => Ok(None),
115 r = fut => r.map(|o| Some(o))
116 }
117 })?;
118
119 module.call_method1(
120 "coroutine_to_safely_cancellable",
121 (
122 base_coro,
123 SafeCancelToken {
124 cancel_tx: Some(cancel_tx),
125 },
126 ),
127 )
128}
129
130#[pyclass]
131#[derive(Clone)]
132pub struct AnyhowWrapper {
133 pub underlying: Arc<RwLock<Option<anyhow::Error>>>,
134}
135
136#[pymethods]
137impl AnyhowWrapper {
138 fn __str__(&self) -> PyResult<String> {
139 Ok(format!(
140 "{:?}",
141 self.underlying.try_read().unwrap().as_ref().unwrap()
142 ))
143 }
144}
145
146#[pyclass(subclass)]
147#[derive(Clone)]
148struct HydroflowSink {
149 underlying: Arc<dyn core::rust_crate::ports::RustCrateSink>,
150}
151
152#[pyclass(name = "Deployment")]
153pub struct Deployment {
154 underlying: Arc<RwLock<core::Deployment>>,
155}
156
157#[pymethods]
158impl Deployment {
159 #[new]
160 fn new() -> Self {
161 Deployment {
162 underlying: Arc::new(RwLock::new(core::Deployment::new())),
163 }
164 }
165
166 #[expect(non_snake_case, reason = "pymethods")]
167 fn Localhost(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
168 let arc = self.underlying.blocking_read().Localhost();
169
170 Ok(Py::new(
171 py,
172 PyClassInitializer::from(Host {
173 underlying: arc.clone(),
174 })
175 .add_subclass(LocalhostHost { underlying: arc }),
176 )?
177 .into_py(py))
178 }
179
180 #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
181 fn GcpComputeEngineHost(
182 &self,
183 py: Python<'_>,
184 project: String,
185 machine_type: String,
186 image: String,
187 region: String,
188 network: GcpNetwork,
189 user: Option<String>,
190 startup_script: Option<String>,
191 ) -> PyResult<Py<PyAny>> {
192 let arc = self.underlying.blocking_write().add_host(|id| {
193 core::GcpComputeEngineHost::new(
194 id,
195 project,
196 machine_type,
197 image,
198 region,
199 network.underlying,
200 user,
201 startup_script,
202 )
203 });
204
205 Ok(Py::new(
206 py,
207 PyClassInitializer::from(Host {
208 underlying: arc.clone(),
209 })
210 .add_subclass(GcpComputeEngineHost { underlying: arc }),
211 )?
212 .into_py(py))
213 }
214
215 #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
216 fn AzureHost(
217 &self,
218 py: Python<'_>,
219 project: String,
220 os_type: String, machine_size: String,
222 region: String,
223 image: Option<HashMap<String, String>>,
224 user: Option<String>,
225 ) -> PyResult<Py<PyAny>> {
226 let arc = self.underlying.blocking_write().add_host(|id| {
227 core::AzureHost::new(id, project, os_type, machine_size, image, region, user)
228 });
229
230 Ok(Py::new(
231 py,
232 PyClassInitializer::from(Host {
233 underlying: arc.clone(),
234 })
235 .add_subclass(AzureHost { underlying: arc }),
236 )?
237 .into_py(py))
238 }
239
240 #[expect(non_snake_case, reason = "pymethods")]
241 fn CustomService(
242 &self,
243 py: Python<'_>,
244 on: &Host,
245 external_ports: Vec<u16>,
246 ) -> PyResult<Py<PyAny>> {
247 let service = self
248 .underlying
249 .blocking_write()
250 .CustomService(on.underlying.clone(), external_ports);
251
252 Ok(Py::new(
253 py,
254 PyClassInitializer::from(Service {
255 underlying: service.clone(),
256 })
257 .add_subclass(CustomService {
258 underlying: service,
259 }),
260 )?
261 .into_py(py))
262 }
263
264 #[expect(non_snake_case, clippy::too_many_arguments, reason = "pymethods")]
265 fn HydroflowCrate(
266 &self,
267 py: Python<'_>,
268 src: String,
269 on: &Host,
270 bin: Option<String>,
271 example: Option<String>,
272 profile: Option<String>,
273 features: Option<Vec<String>>,
274 args: Option<Vec<String>>,
275 display_id: Option<String>,
276 external_ports: Option<Vec<u16>>,
277 ) -> PyResult<Py<PyAny>> {
278 let service = self.underlying.blocking_write().add_service(|id| {
279 core::rust_crate::RustCrateService::new(
280 id,
281 src.into(),
282 on.underlying.clone(),
283 bin,
284 example,
285 profile,
286 None, None, false, None, features,
291 args,
292 display_id,
293 external_ports.unwrap_or_default(),
294 )
295 });
296
297 Ok(Py::new(
298 py,
299 PyClassInitializer::from(Service {
300 underlying: service.clone(),
301 })
302 .add_subclass(HydroflowCrate {
303 underlying: service,
304 }),
305 )?
306 .into_py(py))
307 }
308
309 fn deploy<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
310 let underlying = self.underlying.clone();
311 let py_none = py.None();
312 interruptible_future_to_py(py, async move {
313 underlying.write().await.deploy().await.map_err(|e| {
314 AnyhowError::new_err(AnyhowWrapper {
315 underlying: Arc::new(RwLock::new(Some(e))),
316 })
317 })?;
318 Ok(py_none)
319 })
320 }
321
322 fn start<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
323 let underlying = self.underlying.clone();
324 let py_none = py.None();
325 interruptible_future_to_py(py, async move {
326 underlying.write().await.start().await.map_err(|e| {
327 AnyhowError::new_err(AnyhowWrapper {
328 underlying: Arc::new(RwLock::new(Some(e))),
329 })
330 })?;
331 Ok(py_none)
332 })
333 }
334}
335
336#[pyclass(subclass)]
337pub struct Host {
338 underlying: Arc<dyn core::Host>,
339}
340
341#[pyclass(extends=Host, subclass)]
342struct LocalhostHost {
343 underlying: Arc<core::LocalhostHost>,
344}
345
346#[pymethods]
347impl LocalhostHost {
348 fn client_only(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
349 let arc = Arc::new(self.underlying.client_only());
350
351 Ok(Py::new(
352 py,
353 PyClassInitializer::from(Host {
354 underlying: arc.clone(),
355 })
356 .add_subclass(LocalhostHost { underlying: arc }),
357 )?
358 .into_py(py))
359 }
360}
361
362#[pyclass]
363#[derive(Clone)]
364struct GcpNetwork {
365 underlying: Arc<RwLock<core::gcp::GcpNetwork>>,
366}
367
368#[pymethods]
369impl GcpNetwork {
370 #[new]
371 fn new(project: String, existing: Option<String>) -> Self {
372 GcpNetwork {
373 underlying: Arc::new(RwLock::new(core::gcp::GcpNetwork::new(project, existing))),
374 }
375 }
376}
377
378#[pyclass(extends=Host, subclass)]
379struct GcpComputeEngineHost {
380 underlying: Arc<core::GcpComputeEngineHost>,
381}
382
383#[pymethods]
384impl GcpComputeEngineHost {
385 #[getter]
386 fn internal_ip(&self) -> String {
387 self.underlying.launched.get().unwrap().internal_ip.clone()
388 }
389
390 #[getter]
391 fn external_ip(&self) -> Option<String> {
392 self.underlying.launched.get().unwrap().external_ip.clone()
393 }
394
395 #[getter]
396 fn ssh_key_path(&self) -> String {
397 self.underlying
398 .launched
399 .get()
400 .unwrap()
401 .ssh_key_path()
402 .to_str()
403 .unwrap()
404 .to_string()
405 }
406}
407
408#[pyclass(extends=Host, subclass)]
409struct AzureHost {
410 underlying: Arc<core::AzureHost>,
411}
412
413#[pymethods]
414impl AzureHost {
415 #[getter]
416 fn internal_ip(&self) -> String {
417 self.underlying.launched.get().unwrap().internal_ip.clone()
418 }
419
420 #[getter]
421 fn external_ip(&self) -> Option<String> {
422 self.underlying.launched.get().unwrap().external_ip.clone()
423 }
424
425 #[getter]
426 fn ssh_key_path(&self) -> String {
427 self.underlying
428 .launched
429 .get()
430 .unwrap()
431 .ssh_key_path()
432 .to_str()
433 .unwrap()
434 .to_string()
435 }
436}
437
438#[pyclass(subclass)]
439pub struct Service {
440 underlying: Arc<RwLock<dyn core::Service>>,
441}
442
443#[pymethods]
444impl Service {
445 fn stop<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
446 let underlying = self.underlying.clone();
447 let py_none = py.None();
448 interruptible_future_to_py(py, async move {
449 underlying.write().await.stop().await.unwrap();
450 Ok(py_none)
451 })
452 }
453}
454
455#[pyclass]
456struct PyReceiver {
457 receiver: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<String>>>,
458}
459
460#[pymethods]
461impl PyReceiver {
462 fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
463 slf
464 }
465
466 fn __anext__<'p>(&self, py: Python<'p>) -> Option<&'p PyAny> {
467 let receiver = self.receiver.clone();
468 Some(
469 interruptible_future_to_py(py, async move {
470 receiver
471 .lock()
472 .await
473 .recv()
474 .await
475 .ok_or_else(|| PyStopAsyncIteration::new_err(()))
476 })
477 .unwrap(),
478 )
479 }
480}
481
482#[pyclass(extends=Service, subclass)]
483struct CustomService {
484 underlying: Arc<RwLock<core::CustomService>>,
485}
486
487#[pymethods]
488impl CustomService {
489 fn client_port(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
490 let arc = Arc::new(core::custom_service::CustomClientPort::new(Arc::downgrade(
491 &self.underlying,
492 )));
493
494 Ok(Py::new(
495 py,
496 PyClassInitializer::from(HydroflowSink {
497 underlying: arc.clone(),
498 })
499 .add_subclass(CustomClientPort { underlying: arc }),
500 )?
501 .into_py(py))
502 }
503}
504
505#[pyclass(extends=HydroflowSink, subclass)]
506#[derive(Clone)]
507struct CustomClientPort {
508 underlying: Arc<core::custom_service::CustomClientPort>,
509}
510
511#[pymethods]
512impl CustomClientPort {
513 fn send_to(&self, to: &HydroflowSink) {
514 self.underlying.send_to(to.underlying.deref());
515 }
516
517 fn tagged(&self, tag: u32) -> TaggedSource {
518 TaggedSource {
519 underlying: Arc::new(core::rust_crate::ports::TaggedSource {
520 source: self.underlying.clone(),
521 tag,
522 }),
523 }
524 }
525
526 fn server_port<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
527 let underlying = self.underlying.clone();
528 interruptible_future_to_py(py, async move {
529 Ok(ServerPort {
530 underlying: underlying.server_port().await,
531 })
532 })
533 }
534}
535
536#[pyclass(extends=Service, subclass)]
537struct HydroflowCrate {
538 underlying: Arc<RwLock<core::rust_crate::RustCrateService>>,
539}
540
541#[pymethods]
542impl HydroflowCrate {
543 fn stdout<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
544 let underlying = self.underlying.clone();
545 interruptible_future_to_py(py, async move {
546 let underlying = underlying.read().await;
547 Ok(PyReceiver {
548 receiver: Arc::new(Mutex::new(underlying.stdout())),
549 })
550 })
551 }
552
553 fn stderr<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
554 let underlying = self.underlying.clone();
555 interruptible_future_to_py(py, async move {
556 let underlying = underlying.read().await;
557 Ok(PyReceiver {
558 receiver: Arc::new(Mutex::new(underlying.stderr())),
559 })
560 })
561 }
562
563 fn exit_code<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
564 let underlying = self.underlying.clone();
565 interruptible_future_to_py(py, async move {
566 let underlying = underlying.read().await;
567 Ok(underlying.exit_code())
568 })
569 }
570
571 #[getter]
572 fn ports(&self) -> HydroflowCratePorts {
573 HydroflowCratePorts {
574 underlying: self.underlying.clone(),
575 }
576 }
577}
578
579#[pyclass]
580#[derive(Clone)]
581struct HydroflowCratePorts {
582 underlying: Arc<RwLock<core::rust_crate::RustCrateService>>,
583}
584
585#[pymethods]
586impl HydroflowCratePorts {
587 fn __getattribute__(&self, name: String, py: Python<'_>) -> PyResult<Py<PyAny>> {
588 let arc = Arc::new(
589 self.underlying
590 .try_read()
591 .unwrap()
592 .get_port(name, &self.underlying),
593 );
594
595 Ok(Py::new(
596 py,
597 PyClassInitializer::from(HydroflowSink {
598 underlying: arc.clone(),
599 })
600 .add_subclass(HydroflowCratePort { underlying: arc }),
601 )?
602 .into_py(py))
603 }
604}
605
606#[pyclass(extends=HydroflowSink, subclass)]
607#[derive(Clone)]
608struct HydroflowCratePort {
609 underlying: Arc<core::rust_crate::ports::RustCratePortConfig>,
610}
611
612#[pymethods]
613impl HydroflowCratePort {
614 fn merge(&self, py: Python<'_>) -> PyResult<Py<PyAny>> {
615 let arc = Arc::new(self.underlying.clone().merge());
616
617 Ok(Py::new(
618 py,
619 PyClassInitializer::from(HydroflowSink {
620 underlying: arc.clone(),
621 })
622 .add_subclass(HydroflowCratePort { underlying: arc }),
623 )?
624 .into_py(py))
625 }
626
627 fn send_to(&self, to: &HydroflowSink) {
628 self.underlying.send_to(to.underlying.deref());
629 }
630
631 fn tagged(&self, tag: u32) -> TaggedSource {
632 TaggedSource {
633 underlying: Arc::new(core::rust_crate::ports::TaggedSource {
634 source: self.underlying.clone(),
635 tag,
636 }),
637 }
638 }
639}
640
641#[pyfunction]
642fn demux(mapping: &PyDict) -> HydroflowSink {
643 HydroflowSink {
644 underlying: Arc::new(core::rust_crate::ports::DemuxSink {
645 demux: mapping
646 .into_iter()
647 .map(|(k, v)| {
648 let k = k.extract::<u32>().unwrap();
649 let v = v.extract::<HydroflowSink>().unwrap();
650 (k, v.underlying)
651 })
652 .collect(),
653 }),
654 }
655}
656
657#[pyclass(subclass)]
658#[derive(Clone)]
659struct TaggedSource {
660 underlying: Arc<core::rust_crate::ports::TaggedSource>,
661}
662
663#[pymethods]
664impl TaggedSource {
665 fn send_to(&self, to: &HydroflowSink) {
666 self.underlying.send_to(to.underlying.deref());
667 }
668
669 fn tagged(&self, tag: u32) -> TaggedSource {
670 TaggedSource {
671 underlying: Arc::new(core::rust_crate::ports::TaggedSource {
672 source: self.underlying.clone(),
673 tag,
674 }),
675 }
676 }
677}
678
679#[pyclass(extends=HydroflowSink, subclass)]
680#[derive(Clone)]
681struct HydroflowNull {
682 underlying: Arc<core::rust_crate::ports::NullSourceSink>,
683}
684
685#[pymethods]
686impl HydroflowNull {
687 fn send_to(&self, to: &HydroflowSink) {
688 self.underlying.send_to(to.underlying.deref());
689 }
690
691 fn tagged(&self, tag: u32) -> TaggedSource {
692 TaggedSource {
693 underlying: Arc::new(core::rust_crate::ports::TaggedSource {
694 source: self.underlying.clone(),
695 tag,
696 }),
697 }
698 }
699}
700
701#[pyfunction]
702fn null(py: Python<'_>) -> PyResult<Py<PyAny>> {
703 let arc = Arc::new(core::rust_crate::ports::NullSourceSink);
704
705 Ok(Py::new(
706 py,
707 PyClassInitializer::from(HydroflowSink {
708 underlying: arc.clone(),
709 })
710 .add_subclass(HydroflowNull { underlying: arc }),
711 )?
712 .into_py(py))
713}
714
715#[pyclass]
716struct ServerPort {
717 underlying: hydro_deploy_integration::ServerPort,
718}
719
720fn with_tokio_runtime<T>(f: impl Fn() -> T) -> T {
721 let runtime_read = TOKIO_RUNTIME.read().unwrap();
722 let _guard = runtime_read.as_ref().unwrap().enter();
723 f()
724}
725
726#[pymethods]
727impl ServerPort {
728 fn json(&self, py: Python<'_>) -> Py<PyAny> {
729 pythonize(py, &self.underlying).unwrap()
730 }
731
732 #[expect(clippy::wrong_self_convention, reason = "pymethods")]
733 fn into_source<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
734 let realized = with_tokio_runtime(|| Connection::AsClient(self.underlying.connect()));
735
736 interruptible_future_to_py(py, async move {
737 Ok(PythonStream {
738 underlying: Arc::new(RwLock::new(
739 realized.connect::<ConnectedDirect>().await.into_source(),
740 )),
741 })
742 })
743 }
744
745 #[expect(clippy::wrong_self_convention, reason = "pymethods")]
746 fn into_sink<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
747 let realized = with_tokio_runtime(|| Connection::AsClient(self.underlying.connect()));
748
749 interruptible_future_to_py(py, async move {
750 Ok(PythonSink {
751 underlying: Arc::new(RwLock::new(
752 realized.connect::<ConnectedDirect>().await.into_sink(),
753 )),
754 })
755 })
756 }
757}
758
759#[pyclass]
760#[derive(Clone)]
761struct PythonSink {
762 underlying: Arc<RwLock<DynSink<Bytes>>>,
763}
764
765#[pymethods]
766impl PythonSink {
767 fn send<'p>(&self, data: Py<PyBytes>, py: Python<'p>) -> PyResult<&'p PyAny> {
768 let underlying = self.underlying.clone();
769 let bytes = Bytes::from(data.as_bytes(py).to_vec());
770 interruptible_future_to_py(py, async move {
771 underlying.write().await.send(bytes).await?;
772 Ok(())
773 })
774 }
775}
776
777#[pyclass]
778#[derive(Clone)]
779struct PythonStream {
780 underlying: Arc<RwLock<DynStream>>,
781}
782
783#[pymethods]
784impl PythonStream {
785 fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
786 slf
787 }
788
789 fn __anext__<'p>(&self, py: Python<'p>) -> Option<&'p PyAny> {
790 let underlying = self.underlying.clone();
791 Some(
792 interruptible_future_to_py(py, async move {
793 let read_res = underlying.write().await.next().await;
794 read_res
795 .and_then(|b| b.ok().map(|b| b.to_vec()))
796 .map(Ok)
797 .unwrap_or(Err(PyStopAsyncIteration::new_err(())))
798 })
799 .unwrap(),
800 )
801 }
802}
803
804#[pymodule]
805pub fn _core(py: Python<'_>, module: &PyModule) -> PyResult<()> {
806 unsafe {
807 pyo3::ffi::PyEval_InitThreads();
808 }
809
810 CONVERTERS_MODULE
811 .set(
812 PyModule::from_code(
813 py,
814 "
815import asyncio
816async def coroutine_to_safely_cancellable(c, cancel_token):
817 try:
818 return await asyncio.shield(c)
819 except asyncio.CancelledError:
820 cancel_token.safe_cancel()
821 await c
822 raise asyncio.CancelledError()
823",
824 "converters",
825 "converters",
826 )?
827 .into(),
828 )
829 .unwrap();
830
831 *TOKIO_RUNTIME.write().unwrap() = Some(tokio::runtime::Runtime::new().unwrap());
832 let atexit = PyModule::import(py, "atexit")?;
833 atexit.call_method1("register", (wrap_pyfunction!(cleanup_runtime, module)?,))?;
834
835 module.add("AnyhowError", py.get_type::<AnyhowError>())?;
836 module.add_class::<AnyhowWrapper>()?;
837
838 module.add_class::<HydroflowSink>()?;
839 module.add_class::<Deployment>()?;
840
841 module.add_class::<Host>()?;
842 module.add_class::<LocalhostHost>()?;
843
844 module.add_class::<GcpNetwork>()?;
845 module.add_class::<GcpComputeEngineHost>()?;
846
847 module.add_class::<Service>()?;
848 module.add_class::<CustomService>()?;
849 module.add_class::<CustomClientPort>()?;
850 module.add_class::<HydroflowCrate>()?;
851 module.add_class::<HydroflowCratePort>()?;
852
853 module.add_class::<ServerPort>()?;
854 module.add_class::<PythonSink>()?;
855 module.add_class::<PythonStream>()?;
856
857 module.add_function(wrap_pyfunction!(demux, module)?)?;
858 module.add_function(wrap_pyfunction!(null, module)?)?;
859
860 module.add_wrapped(wrap_pymodule!(cli::cli))?;
861
862 Ok(())
863}