1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11 BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12 RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
20use hydro_deploy::{LinuxCompileType, RustCrate};
21use nanoid::nanoid;
22use proc_macro2::Span;
23use sinktools::lazy::LazySink;
24use stageleft::QuotedWithContext;
25use syn::parse_quote;
26use tar::{Builder, Header};
27use tokio::net::TcpStream;
28use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
29use tracing::{Instrument, instrument, trace, warn};
30
31use super::deploy_runtime_containerized::*;
32use crate::compile::builder::ExternalPortId;
33use crate::compile::deploy::DeployResult;
34use crate::compile::deploy_provider::{
35 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
36};
37use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
38use crate::location::dynamic::LocationId;
39use crate::location::member_id::TaglessMemberId;
40use crate::location::{LocationKey, MembershipEvent, NetworkHint};
41
42#[derive(Clone, Debug)]
44pub struct DockerNetwork {
45 name: String,
46}
47
48impl DockerNetwork {
49 pub fn new(name: String) -> Self {
51 Self {
52 name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
53 }
54 }
55}
56
57#[derive(Clone)]
59pub struct DockerDeployProcess {
60 key: LocationKey,
61 name: String,
62 next_port: Rc<RefCell<u16>>,
63 rust_crate: Rc<RefCell<Option<RustCrate>>>,
64
65 exposed_ports: Rc<RefCell<Vec<u16>>>,
66
67 docker_container_name: Rc<RefCell<Option<String>>>,
68
69 compilation_options: Option<String>,
70
71 config: Vec<String>,
72
73 network: DockerNetwork,
74}
75
76impl Node for DockerDeployProcess {
77 type Port = u16;
78 type Meta = ();
79 type InstantiateEnv = DockerDeploy;
80
81 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
82 fn next_port(&self) -> Self::Port {
83 let port = {
84 let mut borrow = self.next_port.borrow_mut();
85 let port = *borrow;
86 *borrow += 1;
87 port
88 };
89
90 port
91 }
92
93 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
94 fn update_meta(&self, _meta: &Self::Meta) {}
95
96 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
97 fn instantiate(
98 &self,
99 _env: &mut Self::InstantiateEnv,
100 meta: &mut Self::Meta,
101 graph: DfirGraph,
102 extra_stmts: &[syn::Stmt],
103 sidecars: &[syn::Expr],
104 ) {
105 let (bin_name, config) = create_graph_trybuild(
106 graph,
107 extra_stmts,
108 sidecars,
109 Some(&self.name),
110 crate::compile::trybuild::generate::DeployMode::Containerized,
111 LinkingMode::Static,
112 );
113
114 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
115 .target_dir(config.target_dir)
116 .example(bin_name)
117 .no_default_features();
118
119 ret = ret.display_name("test_display_name");
120
121 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
122
123 if let Some(features) = config.features {
124 ret = ret.features(features);
125 }
126
127 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
128 ret = ret.config("build.incremental = false");
129
130 *self.rust_crate.borrow_mut() = Some(ret);
131 }
132}
133
134#[derive(Clone)]
136pub struct DockerDeployCluster {
137 key: LocationKey,
138 name: String,
139 next_port: Rc<RefCell<u16>>,
140 rust_crate: Rc<RefCell<Option<RustCrate>>>,
141
142 docker_container_name: Rc<RefCell<Vec<String>>>,
143
144 compilation_options: Option<String>,
145
146 config: Vec<String>,
147
148 count: usize,
149}
150
151impl Node for DockerDeployCluster {
152 type Port = u16;
153 type Meta = ();
154 type InstantiateEnv = DockerDeploy;
155
156 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
157 fn next_port(&self) -> Self::Port {
158 let port = {
159 let mut borrow = self.next_port.borrow_mut();
160 let port = *borrow;
161 *borrow += 1;
162 port
163 };
164
165 port
166 }
167
168 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
169 fn update_meta(&self, _meta: &Self::Meta) {}
170
171 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
172 fn instantiate(
173 &self,
174 _env: &mut Self::InstantiateEnv,
175 _meta: &mut Self::Meta,
176 graph: DfirGraph,
177 extra_stmts: &[syn::Stmt],
178 sidecars: &[syn::Expr],
179 ) {
180 let (bin_name, config) = create_graph_trybuild(
181 graph,
182 extra_stmts,
183 sidecars,
184 Some(&self.name),
185 crate::compile::trybuild::generate::DeployMode::Containerized,
186 LinkingMode::Static,
187 );
188
189 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
190 .target_dir(config.target_dir)
191 .example(bin_name)
192 .no_default_features();
193
194 ret = ret.display_name("test_display_name");
195
196 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
197
198 if let Some(features) = config.features {
199 ret = ret.features(features);
200 }
201
202 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
203 ret = ret.config("build.incremental = false");
204
205 *self.rust_crate.borrow_mut() = Some(ret);
206 }
207}
208
209#[derive(Clone, Debug)]
211pub struct DockerDeployExternal {
212 name: String,
213 next_port: Rc<RefCell<u16>>,
214
215 ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
216
217 #[expect(clippy::type_complexity, reason = "internal code")]
218 connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
219}
220
221impl Node for DockerDeployExternal {
222 type Port = u16;
223 type Meta = ();
224 type InstantiateEnv = DockerDeploy;
225
226 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
227 fn next_port(&self) -> Self::Port {
228 let port = {
229 let mut borrow = self.next_port.borrow_mut();
230 let port = *borrow;
231 *borrow += 1;
232 port
233 };
234
235 port
236 }
237
238 #[instrument(level = "trace", skip_all, fields(name = self.name))]
239 fn update_meta(&self, _meta: &Self::Meta) {}
240
241 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
242 fn instantiate(
243 &self,
244 _env: &mut Self::InstantiateEnv,
245 meta: &mut Self::Meta,
246 graph: DfirGraph,
247 extra_stmts: &[syn::Stmt],
248 sidecars: &[syn::Expr],
249 ) {
250 trace!(name: "surface", surface = graph.surface_syntax_string());
251 }
252}
253
254type DynSourceSink<Out, In, InErr> = (
255 Pin<Box<dyn Stream<Item = Out>>>,
256 Pin<Box<dyn Sink<In, Error = InErr>>>,
257);
258
259impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
260 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
261 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
262 self.ports.borrow_mut().insert(external_port_id, port);
263 }
264
265 fn as_bytes_bidi(
266 &self,
267 external_port_id: ExternalPortId,
268 ) -> impl Future<
269 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
270 > + 'a {
271 let guard =
272 tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
273
274 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
275 let (docker_container_name, remote_port, _) = self
276 .connection_info
277 .borrow()
278 .get(&local_port)
279 .unwrap()
280 .clone();
281
282 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
283
284 async move {
285 let local_port =
286 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
287 let remote_ip_address = "localhost";
288
289 trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
290
291 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
292 .await
293 .unwrap();
294
295 trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
296
297 let (rx, tx) = stream.into_split();
298
299 let source = Box::pin(
300 FramedRead::new(rx, LengthDelimitedCodec::new()),
301 ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
302
303 let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
304 as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
305
306 (source, sink)
307 }
308 .instrument(guard.exit())
309 }
310
311 fn as_bincode_bidi<InT, OutT>(
312 &self,
313 external_port_id: ExternalPortId,
314 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
315 where
316 InT: serde::Serialize + 'static,
317 OutT: serde::de::DeserializeOwned + 'static,
318 {
319 let guard =
320 tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
321
322 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
323 let (docker_container_name, remote_port, _) = self
324 .connection_info
325 .borrow()
326 .get(&local_port)
327 .unwrap()
328 .clone();
329
330 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
331
332 async move {
333 let local_port =
334 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
335 let remote_ip_address = "localhost";
336
337 trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
338
339 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
340 .await
341 .unwrap();
342
343 trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
344
345 let (rx, tx) = stream.into_split();
346
347 let source = Box::pin(
348 FramedRead::new(rx, LengthDelimitedCodec::new())
349 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
350 ) as Pin<Box<dyn Stream<Item = OutT>>>;
351
352 let sink = Box::pin(
353 FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
354 Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
355 }),
356 ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
357
358 (source, sink)
359 }
360 .instrument(guard.exit())
361 }
362
363 fn as_bincode_sink<T>(
364 &self,
365 external_port_id: ExternalPortId,
366 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
367 where
368 T: serde::Serialize + 'static,
369 {
370 let guard =
371 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
372
373 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
374 let (docker_container_name, remote_port, _) = self
375 .connection_info
376 .borrow()
377 .get(&local_port)
378 .unwrap()
379 .clone();
380
381 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
382
383 async move {
384 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
385 let remote_ip_address = "localhost";
386
387 Box::pin(
388 LazySink::new(move || {
389 Box::pin(async move {
390 trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
391
392 let stream =
393 TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
394 .await?;
395
396 trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
397
398 Result::<_, std::io::Error>::Ok(FramedWrite::new(
399 stream,
400 LengthDelimitedCodec::new(),
401 ))
402 })
403 })
404 .with(move |v| async move {
405 Ok(Bytes::from(bincode::serialize(&v).unwrap()))
406 }),
407 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
408 }
409 .instrument(guard.exit())
410 }
411
412 fn as_bincode_source<T>(
413 &self,
414 external_port_id: ExternalPortId,
415 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
416 where
417 T: serde::de::DeserializeOwned + 'static,
418 {
419 let guard =
420 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
421
422 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
423 let (docker_container_name, remote_port, _) = self
424 .connection_info
425 .borrow()
426 .get(&local_port)
427 .unwrap()
428 .clone();
429
430 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
431
432 async move {
433
434 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
435 let remote_ip_address = "localhost";
436
437 trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
438
439 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
440 .await
441 .unwrap();
442
443 trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
444
445 Box::pin(
446 FramedRead::new(stream, LengthDelimitedCodec::new())
447 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
448 ) as Pin<Box<dyn Stream<Item = T>>>
449 }
450 .instrument(guard.exit())
451 }
452}
453
454#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
455async fn find_dynamically_allocated_docker_port(
456 docker_container_name: &str,
457 destination_port: u16,
458) -> u16 {
459 let docker = Docker::connect_with_local_defaults().unwrap();
460
461 let container_info = docker
462 .inspect_container(docker_container_name, None::<InspectContainerOptions>)
463 .await
464 .unwrap();
465
466 trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
467
468 let remote_port = container_info
470 .network_settings
471 .as_ref()
472 .unwrap()
473 .ports
474 .as_ref()
475 .unwrap()
476 .get(&format!("{destination_port}/tcp"))
477 .unwrap()
478 .as_ref()
479 .unwrap()
480 .iter()
481 .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
482 .unwrap()
483 .host_port
484 .as_ref()
485 .unwrap()
486 .parse()
487 .unwrap();
488
489 remote_port
490}
491
492pub struct DockerDeploy {
494 docker_processes: Vec<DockerDeployProcessSpec>,
495 docker_clusters: Vec<DockerDeployClusterSpec>,
496 network: DockerNetwork,
497 deployment_instance: String,
498}
499
500#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
501async fn create_and_start_container(
502 docker: &Docker,
503 container_name: &str,
504 image_name: &str,
505 network_name: &str,
506 deployment_instance: &str,
507) -> Result<(), anyhow::Error> {
508 let config = ContainerCreateBody {
509 image: Some(image_name.to_owned()),
510 hostname: Some(container_name.to_owned()),
511 host_config: Some(HostConfig {
512 binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
513 publish_all_ports: Some(true),
514 port_bindings: Some(HashMap::new()), ..Default::default()
516 }),
517 env: Some(vec![
518 format!("CONTAINER_NAME={container_name}"),
519 format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
520 format!("RUST_LOG=trace"),
521 ]),
522 networking_config: Some(NetworkingConfig {
523 endpoints_config: Some(HashMap::from([(
524 network_name.to_owned(),
525 EndpointSettings {
526 ..Default::default()
527 },
528 )])),
529 }),
530 tty: Some(true),
531 ..Default::default()
532 };
533
534 let options = CreateContainerOptions {
535 name: Some(container_name.to_owned()),
536 ..Default::default()
537 };
538
539 tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
540 docker.create_container(Some(options), config).await?;
541 docker
542 .start_container(container_name, None::<StartContainerOptions>)
543 .await?;
544
545 Ok(())
546}
547
548#[instrument(level = "trace", skip_all, fields(%image_name))]
549async fn build_and_create_image(
550 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
551 compilation_options: Option<&str>,
552 config: &[String],
553 exposed_ports: &[u16],
554 image_name: &str,
555) -> Result<(), anyhow::Error> {
556 let mut rust_crate = rust_crate
557 .borrow_mut()
558 .take()
559 .unwrap()
560 .rustflags(compilation_options.unwrap_or_default());
561
562 for cfg in config {
563 rust_crate = rust_crate.config(cfg);
564 }
565
566 let build_output = match build_crate_memoized(
567 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(LinuxCompileType::Musl)),
568 )
569 .await
570 {
571 Ok(build_output) => build_output,
572 Err(BuildError::FailedToBuildCrate {
573 exit_status,
574 diagnostics,
575 text_lines,
576 stderr_lines,
577 }) => {
578 let diagnostics = diagnostics
579 .into_iter()
580 .map(|d| d.rendered.unwrap())
581 .collect::<Vec<_>>()
582 .join("\n");
583 let text_lines = text_lines.join("\n");
584 let stderr_lines = stderr_lines.join("\n");
585
586 anyhow::bail!(
587 r#"
588Failed to build crate {exit_status:?}
589--- diagnostics
590---
591{diagnostics}
592---
593---
594---
595
596--- text_lines
597---
598---
599{text_lines}
600---
601---
602---
603
604--- stderr_lines
605---
606---
607{stderr_lines}
608---
609---
610---"#
611 );
612 }
613 Err(err) => {
614 anyhow::bail!("Failed to build crate {err:?}");
615 }
616 };
617
618 let docker = Docker::connect_with_local_defaults()?;
619
620 let mut tar_data = Vec::new();
621 {
622 let mut tar = Builder::new(&mut tar_data);
623
624 let exposed_ports = exposed_ports
625 .iter()
626 .map(|port| format!("EXPOSE {port}/tcp"))
627 .collect::<Vec<_>>()
628 .join("\n");
629
630 let dockerfile_content = format!(
631 r#"
632 FROM scratch
633 {exposed_ports}
634 COPY app /app
635 CMD ["/app"]
636 "#,
637 );
638
639 trace!(name: "dockerfile", %dockerfile_content);
640
641 let mut header = Header::new_gnu();
642 header.set_path("Dockerfile")?;
643 header.set_size(dockerfile_content.len() as u64);
644 header.set_cksum();
645 tar.append(&header, dockerfile_content.as_bytes())?;
646
647 let mut header = Header::new_gnu();
648 header.set_path("app")?;
649 header.set_size(build_output.bin_data.len() as u64);
650 header.set_mode(0o755);
651 header.set_cksum();
652 tar.append(&header, &build_output.bin_data[..])?;
653
654 tar.finish()?;
655 }
656
657 let build_options = BuildImageOptions {
658 dockerfile: "Dockerfile".to_owned(),
659 t: Some(image_name.to_owned()),
660 rm: true,
661 ..Default::default()
662 };
663
664 use bollard::errors::Error;
665
666 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
667 let mut build_stream = docker.build_image(build_options, None, Some(body));
668 while let Some(msg) = build_stream.next().await {
669 match msg {
670 Ok(_) => {}
671 Err(e) => match e {
672 Error::DockerStreamError { error } => {
673 return Err(anyhow::anyhow!(
674 "Docker build failed: DockerStreamError: {{ error: {error} }}"
675 ));
676 }
677 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
678 },
679 }
680 }
681
682 Ok(())
683}
684
685impl DockerDeploy {
686 pub fn new(network: DockerNetwork) -> Self {
688 Self {
689 docker_processes: Vec::new(),
690 docker_clusters: Vec::new(),
691 network,
692 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
693 }
694 }
695
696 pub fn add_localhost_docker(
698 &mut self,
699 compilation_options: Option<String>,
700 config: Vec<String>,
701 ) -> DockerDeployProcessSpec {
702 let process = DockerDeployProcessSpec {
703 compilation_options,
704 config,
705 network: self.network.clone(),
706 deployment_instance: self.deployment_instance.clone(),
707 };
708
709 self.docker_processes.push(process.clone());
710
711 process
712 }
713
714 pub fn add_localhost_docker_cluster(
716 &mut self,
717 compilation_options: Option<String>,
718 config: Vec<String>,
719 count: usize,
720 ) -> DockerDeployClusterSpec {
721 let cluster = DockerDeployClusterSpec {
722 compilation_options,
723 config,
724 count,
725 deployment_instance: self.deployment_instance.clone(),
726 };
727
728 self.docker_clusters.push(cluster.clone());
729
730 cluster
731 }
732
733 pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
735 DockerDeployExternalSpec { name }
736 }
737
738 pub fn get_deployment_instance(&self) -> String {
740 self.deployment_instance.clone()
741 }
742
743 #[instrument(level = "trace", skip_all)]
745 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
746 for (_, _, process) in nodes.get_all_processes() {
747 let exposed_ports = process.exposed_ports.borrow().clone();
748
749 build_and_create_image(
750 &process.rust_crate,
751 process.compilation_options.as_deref(),
752 &process.config,
753 &exposed_ports,
754 &process.name,
755 )
756 .await?;
757 }
758
759 for (_, _, cluster) in nodes.get_all_clusters() {
760 build_and_create_image(
761 &cluster.rust_crate,
762 cluster.compilation_options.as_deref(),
763 &cluster.config,
764 &[], &cluster.name,
766 )
767 .await?;
768 }
769
770 Ok(())
771 }
772
773 #[instrument(level = "trace", skip_all)]
775 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
776 let docker = Docker::connect_with_local_defaults()?;
777
778 match docker
779 .create_network(NetworkCreateRequest {
780 name: self.network.name.clone(),
781 driver: Some("bridge".to_owned()),
782 ..Default::default()
783 })
784 .await
785 {
786 Ok(v) => v.id,
787 Err(e) => {
788 panic!("Failed to create docker network: {e:?}");
789 }
790 };
791
792 for (_, _, process) in nodes.get_all_processes() {
793 let docker_container_name: String = get_docker_container_name(&process.name, None);
794 *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
795
796 create_and_start_container(
797 &docker,
798 &docker_container_name,
799 &process.name,
800 &self.network.name,
801 &self.deployment_instance,
802 )
803 .await?;
804 }
805
806 for (_, _, cluster) in nodes.get_all_clusters() {
807 for num in 0..cluster.count {
808 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
809 cluster
810 .docker_container_name
811 .borrow_mut()
812 .push(docker_container_name.clone());
813
814 create_and_start_container(
815 &docker,
816 &docker_container_name,
817 &cluster.name,
818 &self.network.name,
819 &self.deployment_instance,
820 )
821 .await?;
822 }
823 }
824
825 Ok(())
826 }
827
828 #[instrument(level = "trace", skip_all)]
830 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
831 let docker = Docker::connect_with_local_defaults()?;
832
833 for (_, _, process) in nodes.get_all_processes() {
834 let docker_container_name: String = get_docker_container_name(&process.name, None);
835
836 docker
837 .kill_container(&docker_container_name, None::<KillContainerOptions>)
838 .await?;
839 }
840
841 for (_, _, cluster) in nodes.get_all_clusters() {
842 for num in 0..cluster.count {
843 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
844
845 docker
846 .kill_container(&docker_container_name, None::<KillContainerOptions>)
847 .await?;
848 }
849 }
850
851 Ok(())
852 }
853
854 #[instrument(level = "trace", skip_all)]
856 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
857 let docker = Docker::connect_with_local_defaults()?;
858
859 for (_, _, process) in nodes.get_all_processes() {
860 let docker_container_name: String = get_docker_container_name(&process.name, None);
861
862 docker
863 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
864 .await?;
865 }
866
867 for (_, _, cluster) in nodes.get_all_clusters() {
868 for num in 0..cluster.count {
869 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
870
871 docker
872 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
873 .await?;
874 }
875 }
876
877 docker
878 .remove_network(&self.network.name)
879 .await
880 .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
881
882 use bollard::query_parameters::RemoveImageOptions;
883
884 for (_, _, process) in nodes.get_all_processes() {
885 docker
886 .remove_image(&process.name, None::<RemoveImageOptions>, None)
887 .await?;
888 }
889
890 for (_, _, cluster) in nodes.get_all_clusters() {
891 docker
892 .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
893 .await?;
894 }
895
896 Ok(())
897 }
898}
899
900impl<'a> Deploy<'a> for DockerDeploy {
901 type Meta = ();
902 type InstantiateEnv = Self;
903
904 type Process = DockerDeployProcess;
905 type Cluster = DockerDeployCluster;
906 type External = DockerDeployExternal;
907
908 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
909 fn o2o_sink_source(
910 _env: &mut Self::InstantiateEnv,
911 p1: &Self::Process,
912 p1_port: &<Self::Process as Node>::Port,
913 p2: &Self::Process,
914 p2_port: &<Self::Process as Node>::Port,
915 name: Option<&str>,
916 networking_info: &crate::networking::NetworkingInfo,
917 ) -> (syn::Expr, syn::Expr) {
918 match networking_info {
919 crate::networking::NetworkingInfo::Tcp {
920 fault: crate::networking::TcpFault::FailStop,
921 } => {}
922 _ => panic!("Unsupported networking info: {:?}", networking_info),
923 }
924
925 deploy_containerized_o2o(
926 &p2.name,
927 name.expect("channel name is required for containerized deployment"),
928 )
929 }
930
931 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
932 fn o2o_connect(
933 p1: &Self::Process,
934 p1_port: &<Self::Process as Node>::Port,
935 p2: &Self::Process,
936 p2_port: &<Self::Process as Node>::Port,
937 ) -> Box<dyn FnOnce()> {
938 let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
939
940 Box::new(move || {
941 trace!(name: "o2o_connect thunk", %serialized);
942 })
943 }
944
945 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
946 fn o2m_sink_source(
947 _env: &mut Self::InstantiateEnv,
948 p1: &Self::Process,
949 p1_port: &<Self::Process as Node>::Port,
950 c2: &Self::Cluster,
951 c2_port: &<Self::Cluster as Node>::Port,
952 name: Option<&str>,
953 networking_info: &crate::networking::NetworkingInfo,
954 ) -> (syn::Expr, syn::Expr) {
955 match networking_info {
956 crate::networking::NetworkingInfo::Tcp {
957 fault: crate::networking::TcpFault::FailStop,
958 } => {}
959 _ => panic!("Unsupported networking info: {:?}", networking_info),
960 }
961
962 deploy_containerized_o2m(
963 name.expect("channel name is required for containerized deployment"),
964 )
965 }
966
967 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
968 fn o2m_connect(
969 p1: &Self::Process,
970 p1_port: &<Self::Process as Node>::Port,
971 c2: &Self::Cluster,
972 c2_port: &<Self::Cluster as Node>::Port,
973 ) -> Box<dyn FnOnce()> {
974 let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
975
976 Box::new(move || {
977 trace!(name: "o2m_connect thunk", %serialized);
978 })
979 }
980
981 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
982 fn m2o_sink_source(
983 _env: &mut Self::InstantiateEnv,
984 c1: &Self::Cluster,
985 c1_port: &<Self::Cluster as Node>::Port,
986 p2: &Self::Process,
987 p2_port: &<Self::Process as Node>::Port,
988 name: Option<&str>,
989 networking_info: &crate::networking::NetworkingInfo,
990 ) -> (syn::Expr, syn::Expr) {
991 match networking_info {
992 crate::networking::NetworkingInfo::Tcp {
993 fault: crate::networking::TcpFault::FailStop,
994 } => {}
995 _ => panic!("Unsupported networking info: {:?}", networking_info),
996 }
997
998 deploy_containerized_m2o(
999 &p2.name,
1000 name.expect("channel name is required for containerized deployment"),
1001 )
1002 }
1003
1004 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1005 fn m2o_connect(
1006 c1: &Self::Cluster,
1007 c1_port: &<Self::Cluster as Node>::Port,
1008 p2: &Self::Process,
1009 p2_port: &<Self::Process as Node>::Port,
1010 ) -> Box<dyn FnOnce()> {
1011 let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
1012
1013 Box::new(move || {
1014 trace!(name: "m2o_connect thunk", %serialized);
1015 })
1016 }
1017
1018 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1019 fn m2m_sink_source(
1020 _env: &mut Self::InstantiateEnv,
1021 c1: &Self::Cluster,
1022 c1_port: &<Self::Cluster as Node>::Port,
1023 c2: &Self::Cluster,
1024 c2_port: &<Self::Cluster as Node>::Port,
1025 name: Option<&str>,
1026 networking_info: &crate::networking::NetworkingInfo,
1027 ) -> (syn::Expr, syn::Expr) {
1028 match networking_info {
1029 crate::networking::NetworkingInfo::Tcp {
1030 fault: crate::networking::TcpFault::FailStop,
1031 } => {}
1032 _ => panic!("Unsupported networking info: {:?}", networking_info),
1033 }
1034
1035 deploy_containerized_m2m(
1036 name.expect("channel name is required for containerized deployment"),
1037 )
1038 }
1039
1040 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1041 fn m2m_connect(
1042 c1: &Self::Cluster,
1043 c1_port: &<Self::Cluster as Node>::Port,
1044 c2: &Self::Cluster,
1045 c2_port: &<Self::Cluster as Node>::Port,
1046 ) -> Box<dyn FnOnce()> {
1047 let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1048
1049 Box::new(move || {
1050 trace!(name: "m2m_connect thunk", %serialized);
1051 })
1052 }
1053
1054 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1055 fn e2o_many_source(
1056 extra_stmts: &mut Vec<syn::Stmt>,
1057 p2: &Self::Process,
1058 p2_port: &<Self::Process as Node>::Port,
1059 codec_type: &syn::Type,
1060 shared_handle: String,
1061 ) -> syn::Expr {
1062 p2.exposed_ports.borrow_mut().push(*p2_port);
1063
1064 let socket_ident = syn::Ident::new(
1065 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1066 Span::call_site(),
1067 );
1068
1069 let source_ident = syn::Ident::new(
1070 &format!("__hydro_deploy_many_{}_source", &shared_handle),
1071 Span::call_site(),
1072 );
1073
1074 let sink_ident = syn::Ident::new(
1075 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1076 Span::call_site(),
1077 );
1078
1079 let membership_ident = syn::Ident::new(
1080 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1081 Span::call_site(),
1082 );
1083
1084 let bind_addr = format!("0.0.0.0:{}", p2_port);
1085
1086 extra_stmts.push(syn::parse_quote! {
1087 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1088 });
1089
1090 let root = crate::staging_util::get_this_crate();
1091
1092 extra_stmts.push(syn::parse_quote! {
1093 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1094 });
1095
1096 parse_quote!(#source_ident)
1097 }
1098
1099 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1100 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1101 let sink_ident = syn::Ident::new(
1102 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1103 Span::call_site(),
1104 );
1105 parse_quote!(#sink_ident)
1106 }
1107
1108 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1109 fn e2o_source(
1110 extra_stmts: &mut Vec<syn::Stmt>,
1111 p1: &Self::External,
1112 p1_port: &<Self::External as Node>::Port,
1113 p2: &Self::Process,
1114 p2_port: &<Self::Process as Node>::Port,
1115 _codec_type: &syn::Type,
1116 shared_handle: String,
1117 ) -> syn::Expr {
1118 p1.connection_info.borrow_mut().insert(
1119 *p1_port,
1120 (
1121 p2.docker_container_name.clone(),
1122 *p2_port,
1123 p2.network.clone(),
1124 ),
1125 );
1126
1127 p2.exposed_ports.borrow_mut().push(*p2_port);
1128
1129 let socket_ident = syn::Ident::new(
1130 &format!("__hydro_deploy_{}_socket", &shared_handle),
1131 Span::call_site(),
1132 );
1133
1134 let source_ident = syn::Ident::new(
1135 &format!("__hydro_deploy_{}_source", &shared_handle),
1136 Span::call_site(),
1137 );
1138
1139 let sink_ident = syn::Ident::new(
1140 &format!("__hydro_deploy_{}_sink", &shared_handle),
1141 Span::call_site(),
1142 );
1143
1144 let bind_addr = format!("0.0.0.0:{}", p2_port);
1145
1146 extra_stmts.push(syn::parse_quote! {
1147 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1148 });
1149
1150 let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1151
1152 extra_stmts.push(syn::parse_quote! {
1153 let (#sink_ident, #source_ident) = (#create_expr).split();
1154 });
1155
1156 parse_quote!(#source_ident)
1157 }
1158
1159 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1160 fn e2o_connect(
1161 p1: &Self::External,
1162 p1_port: &<Self::External as Node>::Port,
1163 p2: &Self::Process,
1164 p2_port: &<Self::Process as Node>::Port,
1165 many: bool,
1166 server_hint: NetworkHint,
1167 ) -> Box<dyn FnOnce()> {
1168 if server_hint != NetworkHint::Auto {
1169 panic!(
1170 "Docker deployment only supports NetworkHint::Auto, got {:?}",
1171 server_hint
1172 );
1173 }
1174
1175 if many {
1177 p1.connection_info.borrow_mut().insert(
1178 *p1_port,
1179 (
1180 p2.docker_container_name.clone(),
1181 *p2_port,
1182 p2.network.clone(),
1183 ),
1184 );
1185 }
1186
1187 let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1188
1189 Box::new(move || {
1190 trace!(name: "e2o_connect thunk", %serialized);
1191 })
1192 }
1193
1194 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1195 fn o2e_sink(
1196 p1: &Self::Process,
1197 p1_port: &<Self::Process as Node>::Port,
1198 p2: &Self::External,
1199 p2_port: &<Self::External as Node>::Port,
1200 shared_handle: String,
1201 ) -> syn::Expr {
1202 let sink_ident = syn::Ident::new(
1203 &format!("__hydro_deploy_{}_sink", &shared_handle),
1204 Span::call_site(),
1205 );
1206 parse_quote!(#sink_ident)
1207 }
1208
1209 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1210 fn cluster_ids(
1211 of_cluster: LocationKey,
1212 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1213 cluster_ids()
1214 }
1215
1216 #[instrument(level = "trace", skip_all)]
1217 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1218 cluster_self_id()
1219 }
1220
1221 #[instrument(level = "trace", skip_all, fields(?location_id))]
1222 fn cluster_membership_stream(
1223 _env: &mut Self::InstantiateEnv,
1224 _at_location: &LocationId,
1225 location_id: &LocationId,
1226 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1227 {
1228 cluster_membership_stream(location_id)
1229 }
1230}
1231
1232const CONTAINER_ALPHABET: [char; 36] = [
1233 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1234 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1235];
1236
1237#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1238fn get_docker_image_name(
1239 name_hint: &str,
1240 location_key: LocationKey,
1241 deployment_instance: &str,
1242) -> String {
1243 let name_hint = name_hint
1244 .split("::")
1245 .last()
1246 .unwrap()
1247 .to_ascii_lowercase()
1248 .replace(".", "-")
1249 .replace("_", "-")
1250 .replace("::", "-");
1251
1252 let image_unique_tag = nanoid::nanoid!(6, &CONTAINER_ALPHABET);
1253
1254 format!("hy-{name_hint}-{image_unique_tag}-{deployment_instance}-{location_key}")
1255}
1256
1257#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1258fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1259 if let Some(instance) = instance {
1260 format!("{image_name}-{instance}")
1261 } else {
1262 image_name.to_owned()
1263 }
1264}
1265#[derive(Clone)]
1267pub struct DockerDeployProcessSpec {
1268 compilation_options: Option<String>,
1269 config: Vec<String>,
1270 network: DockerNetwork,
1271 deployment_instance: String,
1272}
1273
1274impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1275 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1276 fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1277 DockerDeployProcess {
1278 key,
1279 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1280
1281 next_port: Rc::new(RefCell::new(1000)),
1282 rust_crate: Rc::new(RefCell::new(None)),
1283
1284 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1285
1286 docker_container_name: Rc::new(RefCell::new(None)),
1287
1288 compilation_options: self.compilation_options,
1289 config: self.config,
1290
1291 network: self.network.clone(),
1292 }
1293 }
1294}
1295
1296#[derive(Clone)]
1298pub struct DockerDeployClusterSpec {
1299 compilation_options: Option<String>,
1300 config: Vec<String>,
1301 count: usize,
1302 deployment_instance: String,
1303}
1304
1305impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1306 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1307 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1308 DockerDeployCluster {
1309 key,
1310 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1311
1312 next_port: Rc::new(RefCell::new(1000)),
1313 rust_crate: Rc::new(RefCell::new(None)),
1314
1315 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1316
1317 compilation_options: self.compilation_options,
1318 config: self.config,
1319
1320 count: self.count,
1321 }
1322 }
1323}
1324
1325pub struct DockerDeployExternalSpec {
1327 name: String,
1328}
1329
1330impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1331 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1332 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1333 DockerDeployExternal {
1334 name: self.name,
1335 next_port: Rc::new(RefCell::new(10000)),
1336 ports: Rc::new(RefCell::new(HashMap::new())),
1337 connection_info: Rc::new(RefCell::new(HashMap::new())),
1338 }
1339 }
1340}