1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bytes::Bytes;
9use dfir_lang::graph::DfirGraph;
10use futures::{Sink, Stream};
11use proc_macro2::Span;
12use serde::{Deserialize, Serialize};
13use stageleft::QuotedWithContext;
14use syn::parse_quote;
15use tracing::{instrument, trace};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct HydroManifest {
20 pub processes: HashMap<String, ProcessManifest>,
22 pub clusters: HashMap<String, ClusterManifest>,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct BuildConfig {
29 pub project_dir: String,
31 pub target_dir: String,
33 pub bin_name: String,
35 pub package_name: String,
37 pub features: Vec<String>,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct PortInfo {
44 pub port: u16,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct ProcessManifest {
51 pub build: BuildConfig,
53 pub location_key: LocationKey,
55 pub ports: HashMap<String, PortInfo>,
57 pub task_family: String,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct ClusterManifest {
64 pub build: BuildConfig,
66 pub location_key: LocationKey,
68 pub ports: Vec<u16>,
70 pub default_count: usize,
72 pub task_family_prefix: String,
74}
75
76use super::deploy_runtime_containerized_ecs::*;
77use crate::compile::builder::ExternalPortId;
78use crate::compile::deploy::DeployResult;
79use crate::compile::deploy_provider::{
80 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
81};
82use crate::compile::trybuild::generate::create_graph_trybuild;
83use crate::location::dynamic::LocationId;
84use crate::location::member_id::TaglessMemberId;
85use crate::location::{LocationKey, MembershipEvent, NetworkHint};
86
87#[derive(Clone)]
89pub struct EcsDeployProcess {
90 id: LocationKey,
91 name: String,
92 next_port: Rc<RefCell<u16>>,
93
94 exposed_ports: Rc<RefCell<HashMap<String, PortInfo>>>,
95
96 trybuild_config:
97 Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
98}
99
100impl Node for EcsDeployProcess {
101 type Port = u16;
102 type Meta = ();
103 type InstantiateEnv = EcsDeploy;
104
105 #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
106 fn next_port(&self) -> Self::Port {
107 let port = {
108 let mut borrow = self.next_port.borrow_mut();
109 let port = *borrow;
110 *borrow += 1;
111 port
112 };
113
114 port
115 }
116
117 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
118 fn update_meta(&self, _meta: &Self::Meta) {}
119
120 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, ?meta, extra_stmts = extra_stmts.len()))]
121 fn instantiate(
122 &self,
123 _env: &mut Self::InstantiateEnv,
124 meta: &mut Self::Meta,
125 graph: DfirGraph,
126 extra_stmts: &[syn::Stmt],
127 sidecars: &[syn::Expr],
128 ) {
129 let (bin_name, config) = create_graph_trybuild(
130 graph,
131 extra_stmts,
132 sidecars,
133 Some(&self.name),
134 crate::compile::trybuild::generate::DeployMode::Containerized,
135 crate::compile::trybuild::generate::LinkingMode::Static,
136 );
137
138 *self.trybuild_config.borrow_mut() = Some((bin_name, config));
140 }
141}
142
143#[derive(Clone)]
145pub struct EcsDeployCluster {
146 id: LocationKey,
147 name: String,
148 next_port: Rc<RefCell<u16>>,
149
150 count: usize,
151
152 trybuild_config:
154 Rc<RefCell<Option<(String, crate::compile::trybuild::generate::TrybuildConfig)>>>,
155}
156
157impl Node for EcsDeployCluster {
158 type Port = u16;
159 type Meta = ();
160 type InstantiateEnv = EcsDeploy;
161
162 #[instrument(level = "trace", skip_all, ret, fields(id = %self.id, name = self.name))]
163 fn next_port(&self) -> Self::Port {
164 let port = {
165 let mut borrow = self.next_port.borrow_mut();
166 let port = *borrow;
167 *borrow += 1;
168 port
169 };
170
171 port
172 }
173
174 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name))]
175 fn update_meta(&self, _meta: &Self::Meta) {}
176
177 #[instrument(level = "trace", skip_all, fields(id = %self.id, name = self.name, extra_stmts = extra_stmts.len()))]
178 fn instantiate(
179 &self,
180 _env: &mut Self::InstantiateEnv,
181 _meta: &mut Self::Meta,
182 graph: DfirGraph,
183 extra_stmts: &[syn::Stmt],
184 sidecars: &[syn::Expr],
185 ) {
186 let (bin_name, config) = create_graph_trybuild(
187 graph,
188 extra_stmts,
189 sidecars,
190 Some(&self.name),
191 crate::compile::trybuild::generate::DeployMode::Containerized,
192 crate::compile::trybuild::generate::LinkingMode::Static,
193 );
194
195 *self.trybuild_config.borrow_mut() = Some((bin_name, config));
197 }
198}
199
200#[derive(Clone, Debug)]
202pub struct EcsDeployExternal {
203 name: String,
204 next_port: Rc<RefCell<u16>>,
205}
206
207impl Node for EcsDeployExternal {
208 type Port = u16;
209 type Meta = ();
210 type InstantiateEnv = EcsDeploy;
211
212 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
213 fn next_port(&self) -> Self::Port {
214 let port = {
215 let mut borrow = self.next_port.borrow_mut();
216 let port = *borrow;
217 *borrow += 1;
218 port
219 };
220
221 port
222 }
223
224 #[instrument(level = "trace", skip_all, fields(name = self.name))]
225 fn update_meta(&self, _meta: &Self::Meta) {}
226
227 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
228 fn instantiate(
229 &self,
230 _env: &mut Self::InstantiateEnv,
231 meta: &mut Self::Meta,
232 graph: DfirGraph,
233 extra_stmts: &[syn::Stmt],
234 sidecars: &[syn::Expr],
235 ) {
236 trace!(name: "surface", surface = graph.surface_syntax_string());
237 }
238}
239
240type DynSourceSink<Out, In, InErr> = (
241 Pin<Box<dyn Stream<Item = Out>>>,
242 Pin<Box<dyn Sink<In, Error = InErr>>>,
243);
244
245impl<'a> RegisterPort<'a, EcsDeploy> for EcsDeployExternal {
246 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
247 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {}
248
249 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
250 fn as_bytes_bidi(
251 &self,
252 _external_port_id: ExternalPortId,
253 ) -> impl Future<
254 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
255 > + 'a {
256 async { unimplemented!() }
257 }
258
259 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
260 fn as_bincode_bidi<InT, OutT>(
261 &self,
262 _external_port_id: ExternalPortId,
263 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
264 where
265 InT: Serialize + 'static,
266 OutT: serde::de::DeserializeOwned + 'static,
267 {
268 async { unimplemented!() }
269 }
270
271 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
272 fn as_bincode_sink<T>(
273 &self,
274 _external_port_id: ExternalPortId,
275 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
276 where
277 T: Serialize + 'static,
278 {
279 async { unimplemented!() }
280 }
281
282 #[expect(clippy::manual_async_fn, reason = "matches trait signature")]
283 fn as_bincode_source<T>(
284 &self,
285 _external_port_id: ExternalPortId,
286 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
287 where
288 T: serde::de::DeserializeOwned + 'static,
289 {
290 async { unimplemented!() }
291 }
292}
293
294pub struct EcsDeploy;
296
297impl Default for EcsDeploy {
298 fn default() -> Self {
299 Self::new()
300 }
301}
302
303impl EcsDeploy {
304 pub fn new() -> Self {
306 Self
307 }
308
309 pub fn add_ecs_process(&mut self) -> EcsDeployProcessSpec {
311 EcsDeployProcessSpec
312 }
313
314 pub fn add_ecs_cluster(&mut self, count: usize) -> EcsDeployClusterSpec {
316 EcsDeployClusterSpec { count }
317 }
318
319 pub fn add_external(&self, name: String) -> EcsDeployExternalSpec {
321 EcsDeployExternalSpec { name }
322 }
323
324 #[instrument(level = "trace", skip_all)]
329 pub fn export_for_cdk(&self, nodes: &DeployResult<'_, Self>) -> HydroManifest {
330 let mut manifest = HydroManifest {
331 processes: HashMap::new(),
332 clusters: HashMap::new(),
333 };
334
335 for (location_id, name_hint, process) in nodes.get_all_processes() {
336 let raw_id = match location_id {
337 LocationId::Process(id) => id,
338 _ => unreachable!(),
339 };
340 let task_family = get_ecs_container_name(&process.name, None);
341 let ports = process.exposed_ports.borrow().clone();
342
343 let (bin_name, trybuild_config) = process
344 .trybuild_config
345 .borrow()
346 .clone()
347 .expect("trybuild_config should be set after instantiate");
348
349 let mut features = vec!["hydro___feature_ecs_runtime".to_owned()];
350 if let Some(extra_features) = trybuild_config.features {
351 features.extend(extra_features);
352 }
353
354 let crate_name = trybuild_config
355 .project_dir
356 .file_name()
357 .and_then(|n| n.to_str())
358 .unwrap_or("unknown")
359 .replace("_", "-");
360 let package_name = format!("{}-hydro-trybuild", crate_name);
361
362 manifest.processes.insert(
363 name_hint.to_owned(),
364 ProcessManifest {
365 build: BuildConfig {
366 project_dir: trybuild_config.project_dir.to_string_lossy().into_owned(),
367 target_dir: trybuild_config.target_dir.to_string_lossy().into_owned(),
368 bin_name,
369 package_name,
370 features,
371 },
372 location_key: raw_id,
373 ports,
374 task_family,
375 },
376 );
377 }
378
379 for (location_id, name_hint, cluster) in nodes.get_all_clusters() {
380 let raw_id = match location_id {
381 LocationId::Cluster(id) => id,
382 _ => unreachable!(),
383 };
384 let task_family_prefix = cluster.name.clone();
385
386 let (bin_name, trybuild_config) = cluster
387 .trybuild_config
388 .borrow()
389 .clone()
390 .expect("trybuild_config should be set after instantiate");
391
392 let mut features = vec!["hydro___feature_ecs_runtime".to_owned()];
393 if let Some(extra_features) = trybuild_config.features {
394 features.extend(extra_features);
395 }
396
397 let crate_name = trybuild_config
398 .project_dir
399 .file_name()
400 .and_then(|n| n.to_str())
401 .unwrap_or("unknown")
402 .replace("_", "-");
403 let package_name = format!("{}-hydro-trybuild", crate_name);
404
405 manifest.clusters.insert(
406 name_hint.to_owned(),
407 ClusterManifest {
408 build: BuildConfig {
409 project_dir: trybuild_config.project_dir.to_string_lossy().into_owned(),
410 target_dir: trybuild_config.target_dir.to_string_lossy().into_owned(),
411 bin_name,
412 package_name,
413 features,
414 },
415 location_key: raw_id,
416 ports: vec![],
417 default_count: cluster.count,
418 task_family_prefix,
419 },
420 );
421 }
422
423 manifest
424 }
425}
426
427impl<'a> Deploy<'a> for EcsDeploy {
428 type InstantiateEnv = Self;
429 type Process = EcsDeployProcess;
430 type Cluster = EcsDeployCluster;
431 type External = EcsDeployExternal;
432 type Meta = ();
433
434 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
435 fn o2o_sink_source(
436 _env: &mut Self::InstantiateEnv,
437 p1: &Self::Process,
438 p1_port: &<Self::Process as Node>::Port,
439 p2: &Self::Process,
440 p2_port: &<Self::Process as Node>::Port,
441 name: Option<&str>,
442 networking_info: &crate::networking::NetworkingInfo,
443 ) -> (syn::Expr, syn::Expr) {
444 match networking_info {
445 crate::networking::NetworkingInfo::Tcp {
446 fault: crate::networking::TcpFault::FailStop,
447 } => {}
448 _ => panic!("Unsupported networking info: {:?}", networking_info),
449 }
450
451 deploy_containerized_o2o(
452 &p2.name,
453 name.expect("channel name is required for containerized deployment"),
454 )
455 }
456
457 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
458 fn o2o_connect(
459 p1: &Self::Process,
460 p1_port: &<Self::Process as Node>::Port,
461 p2: &Self::Process,
462 p2_port: &<Self::Process as Node>::Port,
463 ) -> Box<dyn FnOnce()> {
464 let serialized = format!(
465 "o2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
466 p1.name, p2.name
467 );
468
469 Box::new(move || {
470 trace!(name: "o2o_connect thunk", %serialized);
471 })
472 }
473
474 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
475 fn o2m_sink_source(
476 _env: &mut Self::InstantiateEnv,
477 p1: &Self::Process,
478 p1_port: &<Self::Process as Node>::Port,
479 c2: &Self::Cluster,
480 c2_port: &<Self::Cluster as Node>::Port,
481 name: Option<&str>,
482 networking_info: &crate::networking::NetworkingInfo,
483 ) -> (syn::Expr, syn::Expr) {
484 match networking_info {
485 crate::networking::NetworkingInfo::Tcp {
486 fault: crate::networking::TcpFault::FailStop,
487 } => {}
488 _ => panic!("Unsupported networking info: {:?}", networking_info),
489 }
490
491 deploy_containerized_o2m(
492 name.expect("channel name is required for containerized deployment"),
493 )
494 }
495
496 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
497 fn o2m_connect(
498 p1: &Self::Process,
499 p1_port: &<Self::Process as Node>::Port,
500 c2: &Self::Cluster,
501 c2_port: &<Self::Cluster as Node>::Port,
502 ) -> Box<dyn FnOnce()> {
503 let serialized = format!(
504 "o2m_connect {}:{p1_port:?} -> {}:{c2_port:?}",
505 p1.name, c2.name
506 );
507
508 Box::new(move || {
509 trace!(name: "o2m_connect thunk", %serialized);
510 })
511 }
512
513 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
514 fn m2o_sink_source(
515 _env: &mut Self::InstantiateEnv,
516 c1: &Self::Cluster,
517 c1_port: &<Self::Cluster as Node>::Port,
518 p2: &Self::Process,
519 p2_port: &<Self::Process as Node>::Port,
520 name: Option<&str>,
521 networking_info: &crate::networking::NetworkingInfo,
522 ) -> (syn::Expr, syn::Expr) {
523 match networking_info {
524 crate::networking::NetworkingInfo::Tcp {
525 fault: crate::networking::TcpFault::FailStop,
526 } => {}
527 _ => panic!("Unsupported networking info: {:?}", networking_info),
528 }
529
530 deploy_containerized_m2o(
531 &p2.name,
532 name.expect("channel name is required for containerized deployment"),
533 )
534 }
535
536 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
537 fn m2o_connect(
538 c1: &Self::Cluster,
539 c1_port: &<Self::Cluster as Node>::Port,
540 p2: &Self::Process,
541 p2_port: &<Self::Process as Node>::Port,
542 ) -> Box<dyn FnOnce()> {
543 let serialized = format!(
544 "o2m_connect {}:{c1_port:?} -> {}:{p2_port:?}",
545 c1.name, p2.name
546 );
547
548 Box::new(move || {
549 trace!(name: "m2o_connect thunk", %serialized);
550 })
551 }
552
553 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
554 fn m2m_sink_source(
555 _env: &mut Self::InstantiateEnv,
556 c1: &Self::Cluster,
557 c1_port: &<Self::Cluster as Node>::Port,
558 c2: &Self::Cluster,
559 c2_port: &<Self::Cluster as Node>::Port,
560 name: Option<&str>,
561 networking_info: &crate::networking::NetworkingInfo,
562 ) -> (syn::Expr, syn::Expr) {
563 match networking_info {
564 crate::networking::NetworkingInfo::Tcp {
565 fault: crate::networking::TcpFault::FailStop,
566 } => {}
567 _ => panic!("Unsupported networking info: {:?}", networking_info),
568 }
569
570 deploy_containerized_m2m(
571 name.expect("channel name is required for containerized deployment"),
572 )
573 }
574
575 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
576 fn m2m_connect(
577 c1: &Self::Cluster,
578 c1_port: &<Self::Cluster as Node>::Port,
579 c2: &Self::Cluster,
580 c2_port: &<Self::Cluster as Node>::Port,
581 ) -> Box<dyn FnOnce()> {
582 let serialized = format!(
583 "m2m_connect {}:{c1_port:?} -> {}:{c2_port:?}",
584 c1.name, c2.name
585 );
586
587 Box::new(move || {
588 trace!(name: "m2m_connect thunk", %serialized);
589 })
590 }
591
592 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
593 fn e2o_many_source(
594 extra_stmts: &mut Vec<syn::Stmt>,
595 p2: &Self::Process,
596 p2_port: &<Self::Process as Node>::Port,
597 codec_type: &syn::Type,
598 shared_handle: String,
599 ) -> syn::Expr {
600 p2.exposed_ports
601 .borrow_mut()
602 .insert(shared_handle.clone(), PortInfo { port: *p2_port });
603
604 let socket_ident = syn::Ident::new(
605 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
606 Span::call_site(),
607 );
608
609 let source_ident = syn::Ident::new(
610 &format!("__hydro_deploy_many_{}_source", &shared_handle),
611 Span::call_site(),
612 );
613
614 let sink_ident = syn::Ident::new(
615 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
616 Span::call_site(),
617 );
618
619 let membership_ident = syn::Ident::new(
620 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
621 Span::call_site(),
622 );
623
624 let bind_addr = format!("0.0.0.0:{}", p2_port);
625
626 extra_stmts.push(syn::parse_quote! {
627 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
628 });
629
630 let root = crate::staging_util::get_this_crate();
631
632 extra_stmts.push(syn::parse_quote! {
633 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
634 });
635
636 parse_quote!(#source_ident)
637 }
638
639 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
640 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
641 let sink_ident = syn::Ident::new(
642 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
643 Span::call_site(),
644 );
645 parse_quote!(#sink_ident)
646 }
647
648 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?codec_type, %shared_handle))]
649 fn e2o_source(
650 extra_stmts: &mut Vec<syn::Stmt>,
651 p1: &Self::External,
652 p1_port: &<Self::External as Node>::Port,
653 p2: &Self::Process,
654 p2_port: &<Self::Process as Node>::Port,
655 codec_type: &syn::Type,
656 shared_handle: String,
657 ) -> syn::Expr {
658 p2.exposed_ports
660 .borrow_mut()
661 .insert(shared_handle.clone(), PortInfo { port: *p2_port });
662
663 let source_ident = syn::Ident::new(
664 &format!("__hydro_deploy_{}_source", &shared_handle),
665 Span::call_site(),
666 );
667
668 let bind_addr = format!("0.0.0.0:{}", p2_port);
669
670 let socket_ident = syn::Ident::new(
673 &format!("__hydro_deploy_{}_socket", &shared_handle),
674 Span::call_site(),
675 );
676
677 let sink_ident = syn::Ident::new(
678 &format!("__hydro_deploy_{}_sink", &shared_handle),
679 Span::call_site(),
680 );
681
682 extra_stmts.push(syn::parse_quote! {
683 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
684 });
685
686 let create_expr = deploy_containerized_external_sink_source_ident(bind_addr, socket_ident);
687
688 extra_stmts.push(syn::parse_quote! {
689 let (#sink_ident, #source_ident) = (#create_expr).split();
690 });
691
692 parse_quote!(#source_ident)
693 }
694
695 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
696 fn e2o_connect(
697 p1: &Self::External,
698 p1_port: &<Self::External as Node>::Port,
699 p2: &Self::Process,
700 p2_port: &<Self::Process as Node>::Port,
701 many: bool,
702 server_hint: NetworkHint,
703 ) -> Box<dyn FnOnce()> {
704 let serialized = format!(
705 "e2o_connect {}:{p1_port:?} -> {}:{p2_port:?}",
706 p1.name, p2.name
707 );
708
709 Box::new(move || {
710 trace!(name: "e2o_connect thunk", %serialized);
711 })
712 }
713
714 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
715 fn o2e_sink(
716 p1: &Self::Process,
717 p1_port: &<Self::Process as Node>::Port,
718 p2: &Self::External,
719 p2_port: &<Self::External as Node>::Port,
720 shared_handle: String,
721 ) -> syn::Expr {
722 let sink_ident = syn::Ident::new(
723 &format!("__hydro_deploy_{}_sink", &shared_handle),
724 Span::call_site(),
725 );
726 parse_quote!(#sink_ident)
727 }
728
729 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
730 fn cluster_ids(
731 of_cluster: LocationKey,
732 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
733 cluster_ids()
734 }
735
736 #[instrument(level = "trace", skip_all)]
737 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
738 cluster_self_id()
739 }
740
741 #[instrument(level = "trace", skip_all, fields(?location_id))]
742 fn cluster_membership_stream(
743 _env: &mut Self::InstantiateEnv,
744 _at_location: &LocationId,
745 location_id: &LocationId,
746 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
747 {
748 cluster_membership_stream(location_id)
749 }
750}
751
752#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location))]
753fn get_ecs_image_name(name_hint: &str, location: LocationKey) -> String {
754 let name_hint = name_hint
755 .split("::")
756 .last()
757 .unwrap()
758 .to_ascii_lowercase()
759 .replace(".", "-")
760 .replace("_", "-")
761 .replace("::", "-");
762
763 format!("hy-{name_hint}-{location}")
764}
765
766#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
767fn get_ecs_container_name(image_name: &str, instance: Option<usize>) -> String {
768 if let Some(instance) = instance {
769 format!("{image_name}-{instance}")
770 } else {
771 image_name.to_owned()
772 }
773}
774#[derive(Clone)]
776pub struct EcsDeployProcessSpec;
777
778impl<'a> ProcessSpec<'a, EcsDeploy> for EcsDeployProcessSpec {
779 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
780 fn build(self, id: LocationKey, name_hint: &'_ str) -> <EcsDeploy as Deploy<'a>>::Process {
781 EcsDeployProcess {
782 id,
783 name: get_ecs_image_name(name_hint, id),
784 next_port: Rc::new(RefCell::new(1000)),
785 exposed_ports: Rc::new(RefCell::new(HashMap::new())),
786 trybuild_config: Rc::new(RefCell::new(None)),
787 }
788 }
789}
790
791#[derive(Clone)]
793pub struct EcsDeployClusterSpec {
794 count: usize,
795}
796
797impl<'a> ClusterSpec<'a, EcsDeploy> for EcsDeployClusterSpec {
798 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
799 fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::Cluster {
800 EcsDeployCluster {
801 id,
802 name: get_ecs_image_name(name_hint, id),
803 next_port: Rc::new(RefCell::new(1000)),
804 count: self.count,
805 trybuild_config: Rc::new(RefCell::new(None)),
806 }
807 }
808}
809
810pub struct EcsDeployExternalSpec {
812 name: String,
813}
814
815impl<'a> ExternalSpec<'a, EcsDeploy> for EcsDeployExternalSpec {
816 #[instrument(level = "trace", skip_all, fields(%id, %name_hint))]
817 fn build(self, id: LocationKey, name_hint: &str) -> <EcsDeploy as Deploy<'a>>::External {
818 EcsDeployExternal {
819 name: self.name,
820 next_port: Rc::new(RefCell::new(10000)),
821 }
822 }
823}