1use std::fmt::Debug;
17use std::marker::PhantomData;
18use std::num::ParseIntError;
19use std::time::Duration;
20
21use bytes::{Bytes, BytesMut};
22use futures::stream::Stream as FuturesStream;
23use proc_macro2::Span;
24use quote::quote;
25use serde::de::DeserializeOwned;
26use serde::{Deserialize, Serialize};
27use slotmap::{Key, new_key_type};
28use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
29use stageleft::{QuotedWithContext, q, quote_type};
30use syn::parse_quote;
31use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
32
33use crate::compile::ir::{
34 ClusterMembersState, DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
35};
36use crate::forward_handle::ForwardRef;
37#[cfg(stageleft_runtime)]
38use crate::forward_handle::{CycleCollection, ForwardHandle};
39use crate::live_collections::boundedness::{Bounded, Unbounded};
40use crate::live_collections::keyed_stream::KeyedStream;
41use crate::live_collections::singleton::Singleton;
42use crate::live_collections::stream::{
43 ExactlyOnce, NoOrder, Ordering, Retries, Stream, TotalOrder,
44};
45use crate::location::dynamic::LocationId;
46use crate::location::external_process::{
47 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
48};
49use crate::nondet::NonDet;
50#[cfg(feature = "sim")]
51use crate::sim::SimSender;
52use crate::staging_util::get_this_crate;
53
54pub mod dynamic;
55
56pub mod external_process;
57pub use external_process::External;
58
59pub mod process;
60pub use process::Process;
61
62pub mod cluster;
63pub use cluster::Cluster;
64
65pub mod member_id;
66pub use member_id::{MemberId, TaglessMemberId};
67
68pub mod tick;
69pub use tick::{Atomic, NoTick, Tick};
70
71#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
74pub enum MembershipEvent {
75 Joined,
77 Left,
79}
80
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
87pub enum NetworkHint {
88 Auto,
90 TcpPort(Option<u16>),
95}
96
97pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
98 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
99}
100
101#[stageleft::export(LocationKey)]
102new_key_type! {
103 pub struct LocationKey;
105}
106
107impl std::fmt::Display for LocationKey {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 write!(f, "loc{:?}", self.data()) }
111}
112
113impl std::str::FromStr for LocationKey {
116 type Err = Option<ParseIntError>;
117
118 fn from_str(s: &str) -> Result<Self, Self::Err> {
119 let nvn = s.strip_prefix("loc").ok_or(None)?;
120 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
121 let idx: u64 = idx.parse()?;
122 let ver: u64 = ver.parse()?;
123 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
124 }
125}
126
127impl LocationKey {
128 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
134 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000001)); #[cfg(test)]
138 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000ff00000002)); }
140
141impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
143 type O = LocationKey;
144
145 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
146 where
147 Self: Sized,
148 {
149 let root = get_this_crate();
150 let n = Key::data(&self).as_ffi();
151 (
152 QuoteTokens {
153 prelude: None,
154 expr: Some(quote! {
155 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
156 }),
157 },
158 (),
159 )
160 }
161}
162
163#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
165pub enum LocationType {
166 Process,
168 Cluster,
170 External,
172}
173
174#[expect(
188 private_bounds,
189 reason = "only internal Hydro code can define location types"
190)]
191pub trait Location<'a>: dynamic::DynLocation {
192 type Root: Location<'a>;
197
198 fn root(&self) -> Self::Root;
203
204 fn try_tick(&self) -> Option<Tick<Self>> {
211 if Self::is_top_level() {
212 let id = self.flow_state().borrow_mut().next_clock_id();
213 Some(Tick {
214 id,
215 l: self.clone(),
216 })
217 } else {
218 None
219 }
220 }
221
222 fn id(&self) -> LocationId {
224 dynamic::DynLocation::id(self)
225 }
226
227 fn tick(&self) -> Tick<Self>
253 where
254 Self: NoTick,
255 {
256 let id = self.flow_state().borrow_mut().next_clock_id();
257 Tick {
258 id,
259 l: self.clone(),
260 }
261 }
262
263 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
288 where
289 Self: Sized + NoTick,
290 {
291 Stream::new(
292 self.clone(),
293 HydroNode::Source {
294 source: HydroSource::Spin(),
295 metadata: self.new_node_metadata(Stream::<
296 (),
297 Self,
298 Unbounded,
299 TotalOrder,
300 ExactlyOnce,
301 >::collection_kind()),
302 },
303 )
304 }
305
306 fn source_stream<T, E>(
327 &self,
328 e: impl QuotedWithContext<'a, E, Self>,
329 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
330 where
331 E: FuturesStream<Item = T> + Unpin,
332 Self: Sized + NoTick,
333 {
334 let e = e.splice_untyped_ctx(self);
335
336 Stream::new(
337 self.clone(),
338 HydroNode::Source {
339 source: HydroSource::Stream(e.into()),
340 metadata: self.new_node_metadata(Stream::<
341 T,
342 Self,
343 Unbounded,
344 TotalOrder,
345 ExactlyOnce,
346 >::collection_kind()),
347 },
348 )
349 }
350
351 fn source_iter<T, E>(
373 &self,
374 e: impl QuotedWithContext<'a, E, Self>,
375 ) -> Stream<T, Self, Bounded, TotalOrder, ExactlyOnce>
376 where
377 E: IntoIterator<Item = T>,
378 Self: Sized,
379 {
380 let e = e.splice_typed_ctx(self);
381
382 Stream::new(
383 self.clone(),
384 HydroNode::Source {
385 source: HydroSource::Iter(e.into()),
386 metadata: self.new_node_metadata(
387 Stream::<T, Self, Bounded, TotalOrder, ExactlyOnce>::collection_kind(),
388 ),
389 },
390 )
391 }
392
393 fn source_cluster_members<C: 'a>(
427 &self,
428 cluster: &Cluster<'a, C>,
429 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self, Unbounded>
430 where
431 Self: Sized + NoTick,
432 {
433 Stream::new(
434 self.clone(),
435 HydroNode::Source {
436 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
437 metadata: self.new_node_metadata(Stream::<
438 (TaglessMemberId, MembershipEvent),
439 Self,
440 Unbounded,
441 TotalOrder,
442 ExactlyOnce,
443 >::collection_kind()),
444 },
445 )
446 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
447 .into_keyed()
448 }
449
450 fn source_external_bytes<L>(
458 &self,
459 from: &External<L>,
460 ) -> (
461 ExternalBytesPort,
462 Stream<BytesMut, Self, Unbounded, TotalOrder, ExactlyOnce>,
463 )
464 where
465 Self: Sized + NoTick,
466 {
467 let (port, stream, sink) =
468 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
469
470 sink.complete(self.source_iter(q!([])));
471
472 (port, stream)
473 }
474
475 #[expect(clippy::type_complexity, reason = "stream markers")]
482 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
483 &self,
484 from: &External<L>,
485 ) -> (
486 ExternalBincodeSink<T, NotMany, O, R>,
487 Stream<T, Self, Unbounded, O, R>,
488 )
489 where
490 Self: Sized + NoTick,
491 T: Serialize + DeserializeOwned,
492 {
493 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
494 sink.complete(self.source_iter(q!([])));
495
496 (
497 ExternalBincodeSink {
498 process_key: from.key,
499 port_id: port.port_id,
500 _phantom: PhantomData,
501 },
502 stream.weaken_ordering().weaken_retries(),
503 )
504 }
505
506 #[cfg(feature = "sim")]
511 #[expect(clippy::type_complexity, reason = "stream markers")]
512 fn sim_input<T, O: Ordering, R: Retries>(
513 &self,
514 ) -> (SimSender<T, O, R>, Stream<T, Self, Unbounded, O, R>)
515 where
516 Self: Sized + NoTick,
517 T: Serialize + DeserializeOwned,
518 {
519 let external_location: External<'a, ()> = External {
520 key: LocationKey::FIRST,
521 flow_state: self.flow_state().clone(),
522 _phantom: PhantomData,
523 };
524
525 let (external, stream) = self.source_external_bincode(&external_location);
526
527 (SimSender(external.port_id, PhantomData), stream)
528 }
529
530 fn embedded_input<T>(
536 &self,
537 name: impl Into<String>,
538 ) -> Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>
539 where
540 Self: Sized + NoTick,
541 {
542 let ident = syn::Ident::new(&name.into(), Span::call_site());
543
544 Stream::new(
545 self.clone(),
546 HydroNode::Source {
547 source: HydroSource::Embedded(ident),
548 metadata: self.new_node_metadata(Stream::<
549 T,
550 Self,
551 Unbounded,
552 TotalOrder,
553 ExactlyOnce,
554 >::collection_kind()),
555 },
556 )
557 }
558
559 #[expect(clippy::type_complexity, reason = "stream markers")]
604 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
605 &self,
606 from: &External<L>,
607 port_hint: NetworkHint,
608 ) -> (
609 ExternalBytesPort<NotMany>,
610 Stream<<Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
611 ForwardHandle<'a, Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>,
612 )
613 where
614 Self: Sized + NoTick,
615 {
616 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
617
618 let (fwd_ref, to_sink) =
619 self.forward_ref::<Stream<T, Self, Unbounded, TotalOrder, ExactlyOnce>>();
620 let mut flow_state_borrow = self.flow_state().borrow_mut();
621
622 flow_state_borrow.push_root(HydroRoot::SendExternal {
623 to_external_key: from.key,
624 to_port_id: next_external_port_id,
625 to_many: false,
626 unpaired: false,
627 serialize_fn: None,
628 instantiate_fn: DebugInstantiate::Building,
629 input: Box::new(to_sink.ir_node.into_inner()),
630 op_metadata: HydroIrOpMetadata::new(),
631 });
632
633 let raw_stream: Stream<
634 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
635 Self,
636 Unbounded,
637 TotalOrder,
638 ExactlyOnce,
639 > = Stream::new(
640 self.clone(),
641 HydroNode::ExternalInput {
642 from_external_key: from.key,
643 from_port_id: next_external_port_id,
644 from_many: false,
645 codec_type: quote_type::<Codec>().into(),
646 port_hint,
647 instantiate_fn: DebugInstantiate::Building,
648 deserialize_fn: None,
649 metadata: self.new_node_metadata(Stream::<
650 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
651 Self,
652 Unbounded,
653 TotalOrder,
654 ExactlyOnce,
655 >::collection_kind()),
656 },
657 );
658
659 (
660 ExternalBytesPort {
661 process_key: from.key,
662 port_id: next_external_port_id,
663 _phantom: PhantomData,
664 },
665 raw_stream.flatten_ordered(),
666 fwd_ref,
667 )
668 }
669
670 #[expect(clippy::type_complexity, reason = "stream markers")]
680 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
681 &self,
682 from: &External<L>,
683 ) -> (
684 ExternalBincodeBidi<InT, OutT, NotMany>,
685 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
686 ForwardHandle<'a, Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>,
687 )
688 where
689 Self: Sized + NoTick,
690 {
691 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
692
693 let (fwd_ref, to_sink) =
694 self.forward_ref::<Stream<OutT, Self, Unbounded, TotalOrder, ExactlyOnce>>();
695 let mut flow_state_borrow = self.flow_state().borrow_mut();
696
697 let root = get_this_crate();
698
699 let out_t_type = quote_type::<OutT>();
700 let ser_fn: syn::Expr = syn::parse_quote! {
701 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
702 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
703 )
704 };
705
706 flow_state_borrow.push_root(HydroRoot::SendExternal {
707 to_external_key: from.key,
708 to_port_id: next_external_port_id,
709 to_many: false,
710 unpaired: false,
711 serialize_fn: Some(ser_fn.into()),
712 instantiate_fn: DebugInstantiate::Building,
713 input: Box::new(to_sink.ir_node.into_inner()),
714 op_metadata: HydroIrOpMetadata::new(),
715 });
716
717 let in_t_type = quote_type::<InT>();
718
719 let deser_fn: syn::Expr = syn::parse_quote! {
720 |res| {
721 let b = res.unwrap();
722 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
723 }
724 };
725
726 let raw_stream: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
727 self.clone(),
728 HydroNode::ExternalInput {
729 from_external_key: from.key,
730 from_port_id: next_external_port_id,
731 from_many: false,
732 codec_type: quote_type::<LengthDelimitedCodec>().into(),
733 port_hint: NetworkHint::Auto,
734 instantiate_fn: DebugInstantiate::Building,
735 deserialize_fn: Some(deser_fn.into()),
736 metadata: self.new_node_metadata(Stream::<
737 InT,
738 Self,
739 Unbounded,
740 TotalOrder,
741 ExactlyOnce,
742 >::collection_kind()),
743 },
744 );
745
746 (
747 ExternalBincodeBidi {
748 process_key: from.key,
749 port_id: next_external_port_id,
750 _phantom: PhantomData,
751 },
752 raw_stream,
753 fwd_ref,
754 )
755 }
756
757 #[expect(clippy::type_complexity, reason = "stream markers")]
769 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
770 &self,
771 from: &External<L>,
772 port_hint: NetworkHint,
773 ) -> (
774 ExternalBytesPort<Many>,
775 KeyedStream<u64, <Codec as Decoder>::Item, Self, Unbounded, TotalOrder, ExactlyOnce>,
776 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
777 ForwardHandle<'a, KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>,
778 )
779 where
780 Self: Sized + NoTick,
781 {
782 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
783
784 let (fwd_ref, to_sink) =
785 self.forward_ref::<KeyedStream<u64, T, Self, Unbounded, NoOrder, ExactlyOnce>>();
786 let mut flow_state_borrow = self.flow_state().borrow_mut();
787
788 flow_state_borrow.push_root(HydroRoot::SendExternal {
789 to_external_key: from.key,
790 to_port_id: next_external_port_id,
791 to_many: true,
792 unpaired: false,
793 serialize_fn: None,
794 instantiate_fn: DebugInstantiate::Building,
795 input: Box::new(to_sink.entries().ir_node.into_inner()),
796 op_metadata: HydroIrOpMetadata::new(),
797 });
798
799 let raw_stream: Stream<
800 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
801 Self,
802 Unbounded,
803 TotalOrder,
804 ExactlyOnce,
805 > = Stream::new(
806 self.clone(),
807 HydroNode::ExternalInput {
808 from_external_key: from.key,
809 from_port_id: next_external_port_id,
810 from_many: true,
811 codec_type: quote_type::<Codec>().into(),
812 port_hint,
813 instantiate_fn: DebugInstantiate::Building,
814 deserialize_fn: None,
815 metadata: self.new_node_metadata(Stream::<
816 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
817 Self,
818 Unbounded,
819 TotalOrder,
820 ExactlyOnce,
821 >::collection_kind()),
822 },
823 );
824
825 let membership_stream_ident = syn::Ident::new(
826 &format!(
827 "__hydro_deploy_many_{}_{}_membership",
828 from.key, next_external_port_id
829 ),
830 Span::call_site(),
831 );
832 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
833 let raw_membership_stream: KeyedStream<
834 u64,
835 bool,
836 Self,
837 Unbounded,
838 TotalOrder,
839 ExactlyOnce,
840 > = KeyedStream::new(
841 self.clone(),
842 HydroNode::Source {
843 source: HydroSource::Stream(membership_stream_expr.into()),
844 metadata: self.new_node_metadata(KeyedStream::<
845 u64,
846 bool,
847 Self,
848 Unbounded,
849 TotalOrder,
850 ExactlyOnce,
851 >::collection_kind()),
852 },
853 );
854
855 (
856 ExternalBytesPort {
857 process_key: from.key,
858 port_id: next_external_port_id,
859 _phantom: PhantomData,
860 },
861 raw_stream
862 .flatten_ordered() .into_keyed(),
864 raw_membership_stream.map(q!(|join| {
865 if join {
866 MembershipEvent::Joined
867 } else {
868 MembershipEvent::Left
869 }
870 })),
871 fwd_ref,
872 )
873 }
874
875 #[expect(clippy::type_complexity, reason = "stream markers")]
891 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
892 &self,
893 from: &External<L>,
894 ) -> (
895 ExternalBincodeBidi<InT, OutT, Many>,
896 KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
897 KeyedStream<u64, MembershipEvent, Self, Unbounded, TotalOrder, ExactlyOnce>,
898 ForwardHandle<'a, KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
899 )
900 where
901 Self: Sized + NoTick,
902 {
903 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
904
905 let (fwd_ref, to_sink) =
906 self.forward_ref::<KeyedStream<u64, OutT, Self, Unbounded, NoOrder, ExactlyOnce>>();
907 let mut flow_state_borrow = self.flow_state().borrow_mut();
908
909 let root = get_this_crate();
910
911 let out_t_type = quote_type::<OutT>();
912 let ser_fn: syn::Expr = syn::parse_quote! {
913 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
914 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
915 )
916 };
917
918 flow_state_borrow.push_root(HydroRoot::SendExternal {
919 to_external_key: from.key,
920 to_port_id: next_external_port_id,
921 to_many: true,
922 unpaired: false,
923 serialize_fn: Some(ser_fn.into()),
924 instantiate_fn: DebugInstantiate::Building,
925 input: Box::new(to_sink.entries().ir_node.into_inner()),
926 op_metadata: HydroIrOpMetadata::new(),
927 });
928
929 let in_t_type = quote_type::<InT>();
930
931 let deser_fn: syn::Expr = syn::parse_quote! {
932 |res| {
933 let (id, b) = res.unwrap();
934 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
935 }
936 };
937
938 let raw_stream: KeyedStream<u64, InT, Self, Unbounded, TotalOrder, ExactlyOnce> =
939 KeyedStream::new(
940 self.clone(),
941 HydroNode::ExternalInput {
942 from_external_key: from.key,
943 from_port_id: next_external_port_id,
944 from_many: true,
945 codec_type: quote_type::<LengthDelimitedCodec>().into(),
946 port_hint: NetworkHint::Auto,
947 instantiate_fn: DebugInstantiate::Building,
948 deserialize_fn: Some(deser_fn.into()),
949 metadata: self.new_node_metadata(KeyedStream::<
950 u64,
951 InT,
952 Self,
953 Unbounded,
954 TotalOrder,
955 ExactlyOnce,
956 >::collection_kind()),
957 },
958 );
959
960 let membership_stream_ident = syn::Ident::new(
961 &format!(
962 "__hydro_deploy_many_{}_{}_membership",
963 from.key, next_external_port_id
964 ),
965 Span::call_site(),
966 );
967 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
968 let raw_membership_stream: KeyedStream<
969 u64,
970 bool,
971 Self,
972 Unbounded,
973 TotalOrder,
974 ExactlyOnce,
975 > = KeyedStream::new(
976 self.clone(),
977 HydroNode::Source {
978 source: HydroSource::Stream(membership_stream_expr.into()),
979 metadata: self.new_node_metadata(KeyedStream::<
980 u64,
981 bool,
982 Self,
983 Unbounded,
984 TotalOrder,
985 ExactlyOnce,
986 >::collection_kind()),
987 },
988 );
989
990 (
991 ExternalBincodeBidi {
992 process_key: from.key,
993 port_id: next_external_port_id,
994 _phantom: PhantomData,
995 },
996 raw_stream,
997 raw_membership_stream.map(q!(|join| {
998 if join {
999 MembershipEvent::Joined
1000 } else {
1001 MembershipEvent::Left
1002 }
1003 })),
1004 fwd_ref,
1005 )
1006 }
1007
1008 fn singleton<T>(&self, e: impl QuotedWithContext<'a, T, Self>) -> Singleton<T, Self, Bounded>
1026 where
1027 T: Clone,
1028 Self: Sized,
1029 {
1030 let e = e.splice_untyped_ctx(self);
1031
1032 Singleton::new(
1033 self.clone(),
1034 HydroNode::SingletonSource {
1035 value: e.into(),
1036 first_tick_only: false,
1037 metadata: self.new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
1038 },
1039 )
1040 }
1041
1042 fn source_interval(
1052 &self,
1053 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1054 _nondet: NonDet,
1055 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1056 where
1057 Self: Sized + NoTick,
1058 {
1059 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1060 tokio::time::interval(interval)
1061 )))
1062 }
1063
1064 fn source_interval_delayed(
1075 &self,
1076 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1077 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1078 _nondet: NonDet,
1079 ) -> Stream<tokio::time::Instant, Self, Unbounded, TotalOrder, ExactlyOnce>
1080 where
1081 Self: Sized + NoTick,
1082 {
1083 self.source_stream(q!(tokio_stream::wrappers::IntervalStream::new(
1084 tokio::time::interval_at(tokio::time::Instant::now() + delay, interval)
1085 )))
1086 }
1087
1088 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1122 where
1123 S: CycleCollection<'a, ForwardRef, Location = Self>,
1124 {
1125 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1126 (
1127 ForwardHandle::new(cycle_id, Location::id(self)),
1128 S::create_source(cycle_id, self.clone()),
1129 )
1130 }
1131}
1132
1133#[cfg(feature = "deploy")]
1134#[cfg(test)]
1135mod tests {
1136 use std::collections::HashSet;
1137
1138 use futures::{SinkExt, StreamExt};
1139 use hydro_deploy::Deployment;
1140 use stageleft::q;
1141 use tokio_util::codec::LengthDelimitedCodec;
1142
1143 use crate::compile::builder::FlowBuilder;
1144 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1145 use crate::location::{Location, NetworkHint};
1146 use crate::nondet::nondet;
1147
1148 #[tokio::test]
1149 async fn top_level_singleton_replay_cardinality() {
1150 let mut deployment = Deployment::new();
1151
1152 let mut flow = FlowBuilder::new();
1153 let node = flow.process::<()>();
1154 let external = flow.external::<()>();
1155
1156 let (in_port, input) =
1157 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1158 let singleton = node.singleton(q!(123));
1159 let tick = node.tick();
1160 let out = input
1161 .batch(&tick, nondet!())
1162 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1163 .cross_singleton(
1164 singleton
1165 .snapshot(&tick, nondet!())
1166 .into_stream()
1167 .count(),
1168 )
1169 .all_ticks()
1170 .send_bincode_external(&external);
1171
1172 let nodes = flow
1173 .with_process(&node, deployment.Localhost())
1174 .with_external(&external, deployment.Localhost())
1175 .deploy(&mut deployment);
1176
1177 deployment.deploy().await.unwrap();
1178
1179 let mut external_in = nodes.connect(in_port).await;
1180 let mut external_out = nodes.connect(out).await;
1181
1182 deployment.start().await.unwrap();
1183
1184 external_in.send(1).await.unwrap();
1185 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1186
1187 external_in.send(2).await.unwrap();
1188 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1189 }
1190
1191 #[tokio::test]
1192 async fn tick_singleton_replay_cardinality() {
1193 let mut deployment = Deployment::new();
1194
1195 let mut flow = FlowBuilder::new();
1196 let node = flow.process::<()>();
1197 let external = flow.external::<()>();
1198
1199 let (in_port, input) =
1200 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1201 let tick = node.tick();
1202 let singleton = tick.singleton(q!(123));
1203 let out = input
1204 .batch(&tick, nondet!())
1205 .cross_singleton(singleton.clone())
1206 .cross_singleton(singleton.into_stream().count())
1207 .all_ticks()
1208 .send_bincode_external(&external);
1209
1210 let nodes = flow
1211 .with_process(&node, deployment.Localhost())
1212 .with_external(&external, deployment.Localhost())
1213 .deploy(&mut deployment);
1214
1215 deployment.deploy().await.unwrap();
1216
1217 let mut external_in = nodes.connect(in_port).await;
1218 let mut external_out = nodes.connect(out).await;
1219
1220 deployment.start().await.unwrap();
1221
1222 external_in.send(1).await.unwrap();
1223 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1224
1225 external_in.send(2).await.unwrap();
1226 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1227 }
1228
1229 #[tokio::test]
1230 async fn external_bytes() {
1231 let mut deployment = Deployment::new();
1232
1233 let mut flow = FlowBuilder::new();
1234 let first_node = flow.process::<()>();
1235 let external = flow.external::<()>();
1236
1237 let (in_port, input) = first_node.source_external_bytes(&external);
1238 let out = input.send_bincode_external(&external);
1239
1240 let nodes = flow
1241 .with_process(&first_node, deployment.Localhost())
1242 .with_external(&external, deployment.Localhost())
1243 .deploy(&mut deployment);
1244
1245 deployment.deploy().await.unwrap();
1246
1247 let mut external_in = nodes.connect(in_port).await.1;
1248 let mut external_out = nodes.connect(out).await;
1249
1250 deployment.start().await.unwrap();
1251
1252 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1253
1254 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1255 }
1256
1257 #[tokio::test]
1258 async fn multi_external_source() {
1259 let mut deployment = Deployment::new();
1260
1261 let mut flow = FlowBuilder::new();
1262 let first_node = flow.process::<()>();
1263 let external = flow.external::<()>();
1264
1265 let (in_port, input, _membership, complete_sink) =
1266 first_node.bidi_external_many_bincode(&external);
1267 let out = input.entries().send_bincode_external(&external);
1268 complete_sink.complete(
1269 first_node
1270 .source_iter::<(u64, ()), _>(q!([]))
1271 .into_keyed()
1272 .weaken_ordering(),
1273 );
1274
1275 let nodes = flow
1276 .with_process(&first_node, deployment.Localhost())
1277 .with_external(&external, deployment.Localhost())
1278 .deploy(&mut deployment);
1279
1280 deployment.deploy().await.unwrap();
1281
1282 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1283 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1284 let external_out = nodes.connect(out).await;
1285
1286 deployment.start().await.unwrap();
1287
1288 external_in_1.send(123).await.unwrap();
1289 external_in_2.send(456).await.unwrap();
1290
1291 assert_eq!(
1292 external_out.take(2).collect::<HashSet<_>>().await,
1293 vec![(0, 123), (1, 456)].into_iter().collect()
1294 );
1295 }
1296
1297 #[tokio::test]
1298 async fn second_connection_only_multi_source() {
1299 let mut deployment = Deployment::new();
1300
1301 let mut flow = FlowBuilder::new();
1302 let first_node = flow.process::<()>();
1303 let external = flow.external::<()>();
1304
1305 let (in_port, input, _membership, complete_sink) =
1306 first_node.bidi_external_many_bincode(&external);
1307 let out = input.entries().send_bincode_external(&external);
1308 complete_sink.complete(
1309 first_node
1310 .source_iter::<(u64, ()), _>(q!([]))
1311 .into_keyed()
1312 .weaken_ordering(),
1313 );
1314
1315 let nodes = flow
1316 .with_process(&first_node, deployment.Localhost())
1317 .with_external(&external, deployment.Localhost())
1318 .deploy(&mut deployment);
1319
1320 deployment.deploy().await.unwrap();
1321
1322 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1324 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1325 let mut external_out = nodes.connect(out).await;
1326
1327 deployment.start().await.unwrap();
1328
1329 external_in_2.send(456).await.unwrap();
1330
1331 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1332 }
1333
1334 #[tokio::test]
1335 async fn multi_external_bytes() {
1336 let mut deployment = Deployment::new();
1337
1338 let mut flow = FlowBuilder::new();
1339 let first_node = flow.process::<()>();
1340 let external = flow.external::<()>();
1341
1342 let (in_port, input, _membership, complete_sink) = first_node
1343 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1344 let out = input.entries().send_bincode_external(&external);
1345 complete_sink.complete(
1346 first_node
1347 .source_iter(q!([]))
1348 .into_keyed()
1349 .weaken_ordering(),
1350 );
1351
1352 let nodes = flow
1353 .with_process(&first_node, deployment.Localhost())
1354 .with_external(&external, deployment.Localhost())
1355 .deploy(&mut deployment);
1356
1357 deployment.deploy().await.unwrap();
1358
1359 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1360 let mut external_in_2 = nodes.connect(in_port).await.1;
1361 let external_out = nodes.connect(out).await;
1362
1363 deployment.start().await.unwrap();
1364
1365 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1366 external_in_2.send(vec![4, 5].into()).await.unwrap();
1367
1368 assert_eq!(
1369 external_out.take(2).collect::<HashSet<_>>().await,
1370 vec![
1371 (0, (&[1u8, 2, 3] as &[u8]).into()),
1372 (1, (&[4u8, 5] as &[u8]).into())
1373 ]
1374 .into_iter()
1375 .collect()
1376 );
1377 }
1378
1379 #[tokio::test]
1380 async fn single_client_external_bytes() {
1381 let mut deployment = Deployment::new();
1382 let mut flow = FlowBuilder::new();
1383 let first_node = flow.process::<()>();
1384 let external = flow.external::<()>();
1385 let (port, input, complete_sink) = first_node
1386 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1387 complete_sink.complete(input.map(q!(|data| {
1388 let mut resp: Vec<u8> = data.into();
1389 resp.push(42);
1390 resp.into() })));
1392
1393 let nodes = flow
1394 .with_process(&first_node, deployment.Localhost())
1395 .with_external(&external, deployment.Localhost())
1396 .deploy(&mut deployment);
1397
1398 deployment.deploy().await.unwrap();
1399 deployment.start().await.unwrap();
1400
1401 let (mut external_out, mut external_in) = nodes.connect(port).await;
1402
1403 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1404 assert_eq!(
1405 external_out.next().await.unwrap().unwrap(),
1406 vec![1, 2, 3, 42]
1407 );
1408 }
1409
1410 #[tokio::test]
1411 async fn echo_external_bytes() {
1412 let mut deployment = Deployment::new();
1413
1414 let mut flow = FlowBuilder::new();
1415 let first_node = flow.process::<()>();
1416 let external = flow.external::<()>();
1417
1418 let (port, input, _membership, complete_sink) = first_node
1419 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1420 complete_sink
1421 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1422
1423 let nodes = flow
1424 .with_process(&first_node, deployment.Localhost())
1425 .with_external(&external, deployment.Localhost())
1426 .deploy(&mut deployment);
1427
1428 deployment.deploy().await.unwrap();
1429
1430 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1431 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1432
1433 deployment.start().await.unwrap();
1434
1435 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1436 external_in_2.send(vec![4, 5].into()).await.unwrap();
1437
1438 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1439 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1440 }
1441
1442 #[tokio::test]
1443 async fn echo_external_bincode() {
1444 let mut deployment = Deployment::new();
1445
1446 let mut flow = FlowBuilder::new();
1447 let first_node = flow.process::<()>();
1448 let external = flow.external::<()>();
1449
1450 let (port, input, _membership, complete_sink) =
1451 first_node.bidi_external_many_bincode(&external);
1452 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1453
1454 let nodes = flow
1455 .with_process(&first_node, deployment.Localhost())
1456 .with_external(&external, deployment.Localhost())
1457 .deploy(&mut deployment);
1458
1459 deployment.deploy().await.unwrap();
1460
1461 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1462 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1463
1464 deployment.start().await.unwrap();
1465
1466 external_in_1.send("hi".to_owned()).await.unwrap();
1467 external_in_2.send("hello".to_owned()).await.unwrap();
1468
1469 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1470 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1471 }
1472}