1use std::fmt::Debug;
17use std::future::Future;
18#[cfg(feature = "tokio")]
19use std::marker::PhantomData;
20use std::num::ParseIntError;
21#[cfg(feature = "tokio")]
22use std::time::Duration;
23
24#[cfg(feature = "tokio")]
25use bytes::{Bytes, BytesMut};
26use futures::stream::Stream as FuturesStream;
27use proc_macro2::Span;
28use quote::quote;
29#[cfg(feature = "tokio")]
30use serde::de::DeserializeOwned;
31use serde::{Deserialize, Serialize};
32use slotmap::{Key, new_key_type};
33#[cfg(feature = "tokio")]
34use stageleft::quote_type;
35use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
36use stageleft::{QuotedWithContext, q};
37use syn::parse_quote;
38#[cfg(feature = "tokio")]
39use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
40
41#[cfg(feature = "tokio")]
42use crate::compile::ir::DebugInstantiate;
43use crate::compile::ir::{
44 ClusterMembersState, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
45};
46use crate::forward_handle::ForwardRef;
47#[cfg(stageleft_runtime)]
48use crate::forward_handle::{CycleCollection, ForwardHandle};
49use crate::live_collections::boundedness::{Bounded, Unbounded};
50use crate::live_collections::keyed_stream::KeyedStream;
51use crate::live_collections::singleton::Singleton;
52use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
53#[cfg(feature = "tokio")]
54use crate::live_collections::stream::{Ordering, Retries};
55#[cfg(stageleft_runtime)]
56use crate::location::dynamic::DynLocation;
57use crate::location::dynamic::{ClusterConsistency, LocationId};
58#[cfg(feature = "tokio")]
59use crate::location::external_process::{
60 ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
61};
62use crate::nondet::NonDet;
63#[cfg(feature = "tokio")]
64use crate::properties::manual_proof;
65#[cfg(feature = "sim")]
66use crate::sim::SimSender;
67use crate::staging_util::get_this_crate;
68
69pub mod dynamic;
70
71pub mod external_process;
72pub use external_process::External;
73
74pub mod process;
75pub use process::Process;
76
77pub mod cluster;
78pub use cluster::Cluster;
79
80pub mod member_id;
81pub use member_id::{MemberId, TaglessMemberId};
82
83pub mod tick;
84pub use tick::{Atomic, Tick};
85
86#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
89pub enum MembershipEvent {
90 Joined,
92 Left,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub enum NetworkHint {
103 Auto,
105 TcpPort(Option<u16>),
110}
111
112pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
113 assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
114}
115
116#[stageleft::export(LocationKey)]
117new_key_type! {
118 pub struct LocationKey;
120}
121
122impl std::fmt::Display for LocationKey {
123 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124 write!(f, "loc{:?}", self.data()) }
126}
127
128impl std::str::FromStr for LocationKey {
131 type Err = Option<ParseIntError>;
132
133 fn from_str(s: &str) -> Result<Self, Self::Err> {
134 let nvn = s.strip_prefix("loc").ok_or(None)?;
135 let (idx, ver) = nvn.split_once("v").ok_or(None)?;
136 let idx: u64 = idx.parse()?;
137 let ver: u64 = ver.parse()?;
138 Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
139 }
140}
141
142impl LocationKey {
143 pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); #[cfg(test)]
149 pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); #[cfg(test)]
153 pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); }
155
156impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
158 type O = LocationKey;
159
160 fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
161 where
162 Self: Sized,
163 {
164 let root = get_this_crate();
165 let n = Key::data(&self).as_ffi();
166 (
167 QuoteTokens {
168 prelude: None,
169 expr: Some(quote! {
170 #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
171 }),
172 },
173 (),
174 )
175 }
176}
177
178#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
180pub enum LocationType {
181 Process,
183 Cluster,
185 External,
187}
188
189pub trait TopLevel<'a>: Location<'a> {}
191
192#[expect(
206 private_bounds,
207 reason = "only internal Hydro code can define location types"
208)]
209pub trait Location<'a>: DynLocation {
210 type Root: Location<'a>;
215
216 type DropConsistency: Location<'a, DropConsistency = Self::DropConsistency>;
218
219 fn root(&self) -> Self::Root;
224
225 fn drop_consistency(&self) -> Self::DropConsistency;
227 fn consistency() -> Option<ClusterConsistency>;
229
230 fn with_consistency_of<L2: Location<'a, DropConsistency = Self::DropConsistency>>(&self) -> L2 {
232 L2::from_drop_consistency(self.drop_consistency())
233 }
234
235 #[doc(hidden)]
236 fn from_drop_consistency(l2: Self::DropConsistency) -> Self;
237
238 fn try_tick(&self) -> Option<Tick<Self>> {
245 if Self::is_top_level() {
246 let id = self.flow_state().borrow_mut().next_clock_id();
247 Some(Tick {
248 id,
249 l: self.clone(),
250 })
251 } else {
252 None
253 }
254 }
255
256 fn id(&self) -> LocationId {
258 DynLocation::dyn_id(self)
259 }
260
261 fn tick(&self) -> Tick<Self> {
287 if let LocationId::Tick(_, _) = self.id() {
288 panic!("cannot create nested ticks");
289 }
290
291 let id = self.flow_state().borrow_mut().next_clock_id();
292 Tick {
293 id,
294 l: self.clone(),
295 }
296 }
297
298 fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
323 where
324 Self: TopLevel<'a> + Sized,
325 {
326 Stream::new(
327 self.clone(),
328 HydroNode::Source {
329 source: HydroSource::Spin(),
330 metadata: self.new_node_metadata(Stream::<
331 (),
332 Self,
333 Unbounded,
334 TotalOrder,
335 ExactlyOnce,
336 >::collection_kind()),
337 },
338 )
339 }
340
341 fn source_stream<T, E>(
362 &self,
363 e: impl QuotedWithContext<'a, E, Self>,
364 ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
365 where
366 E: FuturesStream<Item = T> + Unpin,
367 Self: TopLevel<'a> + Sized,
368 {
369 let e = e.splice_untyped_ctx(self);
370
371 let target_location = self.drop_consistency();
372 Stream::new(
373 target_location.clone(),
374 HydroNode::Source {
375 source: HydroSource::Stream(e.into()),
376 metadata: target_location.new_node_metadata(Stream::<
377 T,
378 Self::DropConsistency,
379 Unbounded,
380 TotalOrder,
381 ExactlyOnce,
382 >::collection_kind()),
383 },
384 )
385 }
386
387 fn source_iter<T, E>(
409 &self,
410 e: impl QuotedWithContext<'a, E, Self>,
411 ) -> Stream<T, Self::DropConsistency, Bounded, TotalOrder, ExactlyOnce>
412 where
413 E: IntoIterator<Item = T>,
414 Self: Sized,
415 {
416 let e = e.splice_typed_ctx(self);
417
418 let target_location = self.drop_consistency();
419 Stream::new(
420 target_location.clone(),
421 HydroNode::Source {
422 source: HydroSource::Iter(e.into()),
423 metadata: target_location.new_node_metadata(Stream::<
424 T,
425 Self::DropConsistency,
426 Bounded,
427 TotalOrder,
428 ExactlyOnce,
429 >::collection_kind()),
430 },
431 )
432 }
433
434 #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
435 fn source_cluster_members<C: 'a>(
474 &self,
475 cluster: &Cluster<'a, C>,
476 nondet_start: NonDet,
477 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
478 where
479 Self: TopLevel<'a> + Sized,
480 {
481 self.source_cluster_membership_stream(cluster, nondet_start)
482 }
483
484 fn source_cluster_membership_stream<C: 'a>(
523 &self,
524 cluster: &Cluster<'a, C>,
525 _nondet_start: NonDet,
526 ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
527 where
528 Self: TopLevel<'a> + Sized,
529 {
530 let target_consistency = self.drop_consistency();
531 Stream::new(
532 target_consistency.clone(),
533 HydroNode::Source {
534 source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
535 metadata: target_consistency.new_node_metadata(Stream::<
536 (TaglessMemberId, MembershipEvent),
537 Self,
538 Unbounded,
539 TotalOrder,
540 ExactlyOnce,
541 >::collection_kind(
542 )),
543 },
544 )
545 .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
546 .into_keyed()
547 }
548
549 #[cfg(feature = "tokio")]
557 fn source_external_bytes<L>(
558 &self,
559 from: &External<L>,
560 ) -> (
561 ExternalBytesPort,
562 Stream<BytesMut, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
563 )
564 where
565 Self: TopLevel<'a> + Sized,
566 {
567 let (port, stream, sink) =
568 self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
569
570 sink.complete(stream.location().source_iter(q!([])));
571
572 (port, stream)
573 }
574
575 #[cfg(feature = "tokio")]
582 fn source_external_bincode<L, T, O: Ordering, R: Retries>(
583 &self,
584 from: &External<L>,
585 ) -> (
586 ExternalBincodeSink<T, NotMany, O, R>,
587 Stream<T, Self::DropConsistency, Unbounded, O, R>,
588 )
589 where
590 Self: TopLevel<'a> + Sized,
591 T: Serialize + DeserializeOwned,
592 {
593 let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
594 sink.complete(stream.location().source_iter(q!([])));
595
596 (
597 ExternalBincodeSink {
598 process_key: from.key,
599 port_id: port.port_id,
600 _phantom: PhantomData,
601 },
602 stream.weaken_ordering().weaken_retries(),
603 )
604 }
605
606 #[cfg(feature = "sim")]
611 fn sim_input<T, O: Ordering, R: Retries>(
612 &self,
613 ) -> (
614 SimSender<T, O, R>,
615 Stream<T, Self::DropConsistency, Unbounded, O, R>,
616 )
617 where
618 Self: TopLevel<'a> + Sized,
619 T: Serialize + DeserializeOwned,
620 {
621 let external_location: External<'a, ()> = External {
622 key: LocationKey::FIRST,
623 flow_state: self.flow_state().clone(),
624 _phantom: PhantomData,
625 };
626
627 let (external, stream) = self.source_external_bincode(&external_location);
628
629 (SimSender(external.port_id, PhantomData), stream)
630 }
631
632 fn embedded_input<T>(
638 &self,
639 name: impl Into<String>,
640 ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
641 where
642 Self: TopLevel<'a> + Sized,
643 {
644 let ident = syn::Ident::new(&name.into(), Span::call_site());
645
646 let target_location = self.drop_consistency();
647 Stream::new(
648 target_location.clone(),
649 HydroNode::Source {
650 source: HydroSource::Embedded(ident),
651 metadata: target_location.new_node_metadata(Stream::<
652 T,
653 Self,
654 Unbounded,
655 TotalOrder,
656 ExactlyOnce,
657 >::collection_kind()),
658 },
659 )
660 }
661
662 fn embedded_singleton_input<T>(
668 &self,
669 name: impl Into<String>,
670 ) -> Singleton<T, Self::DropConsistency, Bounded>
671 where
672 Self: TopLevel<'a> + Sized,
673 {
674 let ident = syn::Ident::new(&name.into(), Span::call_site());
675
676 let target_location = self.drop_consistency();
677 Singleton::new(
678 target_location.clone(),
679 HydroNode::Source {
680 source: HydroSource::EmbeddedSingleton(ident),
681 metadata: target_location
682 .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
683 },
684 )
685 }
686
687 #[cfg(feature = "tokio")]
732 #[expect(clippy::type_complexity, reason = "stream markers")]
733 fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
734 &self,
735 from: &External<L>,
736 port_hint: NetworkHint,
737 ) -> (
738 ExternalBytesPort<NotMany>,
739 Stream<<Codec as Decoder>::Item, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
740 ForwardHandle<'a, Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
741 )
742 where
743 Self: TopLevel<'a> + Sized,
744 {
745 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
746 let target_consistency = self.drop_consistency();
747
748 let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
749 T,
750 Self::DropConsistency,
751 Unbounded,
752 TotalOrder,
753 ExactlyOnce,
754 >>();
755 let mut flow_state_borrow = self.flow_state().borrow_mut();
756
757 flow_state_borrow.push_root(HydroRoot::SendExternal {
758 to_external_key: from.key,
759 to_port_id: next_external_port_id,
760 to_many: false,
761 unpaired: false,
762 serialize_fn: None,
763 instantiate_fn: DebugInstantiate::Building,
764 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
765 op_metadata: HydroIrOpMetadata::new(),
766 });
767
768 let raw_stream: Stream<
769 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
770 Self::DropConsistency,
771 Unbounded,
772 TotalOrder,
773 ExactlyOnce,
774 > = Stream::new(
775 target_consistency.clone(),
776 HydroNode::ExternalInput {
777 from_external_key: from.key,
778 from_port_id: next_external_port_id,
779 from_many: false,
780 codec_type: quote_type::<Codec>().into(),
781 port_hint,
782 instantiate_fn: DebugInstantiate::Building,
783 deserialize_fn: None,
784 metadata: target_consistency.new_node_metadata(Stream::<
785 Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
786 Self::DropConsistency,
787 Unbounded,
788 TotalOrder,
789 ExactlyOnce,
790 >::collection_kind(
791 )),
792 },
793 );
794
795 (
796 ExternalBytesPort {
797 process_key: from.key,
798 port_id: next_external_port_id,
799 _phantom: PhantomData,
800 },
801 raw_stream.flatten_ordered(),
802 fwd_ref,
803 )
804 }
805
806 #[cfg(feature = "tokio")]
816 #[expect(clippy::type_complexity, reason = "stream markers")]
817 fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
818 &self,
819 from: &External<L>,
820 ) -> (
821 ExternalBincodeBidi<InT, OutT, NotMany>,
822 Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
823 ForwardHandle<'a, Stream<OutT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
824 )
825 where
826 Self: TopLevel<'a> + Sized,
827 {
828 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
829
830 let target_consistency = self.drop_consistency();
831 let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
832 OutT,
833 Self::DropConsistency,
834 Unbounded,
835 TotalOrder,
836 ExactlyOnce,
837 >>();
838 let mut flow_state_borrow = self.flow_state().borrow_mut();
839
840 let root = get_this_crate();
841
842 let out_t_type = quote_type::<OutT>();
843 let ser_fn: syn::Expr = syn::parse_quote! {
844 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
845 |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
846 )
847 };
848
849 flow_state_borrow.push_root(HydroRoot::SendExternal {
850 to_external_key: from.key,
851 to_port_id: next_external_port_id,
852 to_many: false,
853 unpaired: false,
854 serialize_fn: Some(ser_fn.into()),
855 instantiate_fn: DebugInstantiate::Building,
856 input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
857 op_metadata: HydroIrOpMetadata::new(),
858 });
859
860 let in_t_type = quote_type::<InT>();
861
862 let deser_fn: syn::Expr = syn::parse_quote! {
863 |res| {
864 let b = res.unwrap();
865 #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
866 }
867 };
868
869 let raw_stream: Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce> =
870 Stream::new(
871 target_consistency.clone(),
872 HydroNode::ExternalInput {
873 from_external_key: from.key,
874 from_port_id: next_external_port_id,
875 from_many: false,
876 codec_type: quote_type::<LengthDelimitedCodec>().into(),
877 port_hint: NetworkHint::Auto,
878 instantiate_fn: DebugInstantiate::Building,
879 deserialize_fn: Some(deser_fn.into()),
880 metadata: target_consistency.new_node_metadata(Stream::<
881 InT,
882 Self::DropConsistency,
883 Unbounded,
884 TotalOrder,
885 ExactlyOnce,
886 >::collection_kind(
887 )),
888 },
889 );
890
891 (
892 ExternalBincodeBidi {
893 process_key: from.key,
894 port_id: next_external_port_id,
895 _phantom: PhantomData,
896 },
897 raw_stream,
898 fwd_ref,
899 )
900 }
901
902 #[cfg(feature = "tokio")]
914 #[expect(clippy::type_complexity, reason = "stream markers")]
915 fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
916 &self,
917 from: &External<L>,
918 port_hint: NetworkHint,
919 ) -> (
920 ExternalBytesPort<Many>,
921 KeyedStream<
922 u64,
923 <Codec as Decoder>::Item,
924 Self::DropConsistency,
925 Unbounded,
926 TotalOrder,
927 ExactlyOnce,
928 >,
929 KeyedStream<
930 u64,
931 MembershipEvent,
932 Self::DropConsistency,
933 Unbounded,
934 TotalOrder,
935 ExactlyOnce,
936 >,
937 ForwardHandle<
938 'a,
939 KeyedStream<u64, T, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
940 >,
941 )
942 where
943 Self: TopLevel<'a> + Sized,
944 {
945 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
946
947 let target_consistency = self.drop_consistency();
948 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
949 u64,
950 T,
951 Self::DropConsistency,
952 Unbounded,
953 NoOrder,
954 ExactlyOnce,
955 >>();
956 let mut flow_state_borrow = self.flow_state().borrow_mut();
957
958 flow_state_borrow.push_root(HydroRoot::SendExternal {
959 to_external_key: from.key,
960 to_port_id: next_external_port_id,
961 to_many: true,
962 unpaired: false,
963 serialize_fn: None,
964 instantiate_fn: DebugInstantiate::Building,
965 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
966 op_metadata: HydroIrOpMetadata::new(),
967 });
968
969 let raw_stream: Stream<
970 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
971 Self::DropConsistency,
972 Unbounded,
973 TotalOrder,
974 ExactlyOnce,
975 > = Stream::new(
976 target_consistency.clone(),
977 HydroNode::ExternalInput {
978 from_external_key: from.key,
979 from_port_id: next_external_port_id,
980 from_many: true,
981 codec_type: quote_type::<Codec>().into(),
982 port_hint,
983 instantiate_fn: DebugInstantiate::Building,
984 deserialize_fn: None,
985 metadata: target_consistency.new_node_metadata(Stream::<
986 Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
987 Self::DropConsistency,
988 Unbounded,
989 TotalOrder,
990 ExactlyOnce,
991 >::collection_kind(
992 )),
993 },
994 );
995
996 let membership_stream_ident = syn::Ident::new(
997 &format!(
998 "__hydro_deploy_many_{}_{}_membership",
999 from.key, next_external_port_id
1000 ),
1001 Span::call_site(),
1002 );
1003 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1004 let raw_membership_stream: KeyedStream<
1005 u64,
1006 bool,
1007 Self::DropConsistency,
1008 Unbounded,
1009 TotalOrder,
1010 ExactlyOnce,
1011 > = KeyedStream::new(
1012 target_consistency.clone(),
1013 HydroNode::Source {
1014 source: HydroSource::Stream(membership_stream_expr.into()),
1015 metadata: target_consistency.new_node_metadata(KeyedStream::<
1016 u64,
1017 bool,
1018 Self::DropConsistency,
1019 Unbounded,
1020 TotalOrder,
1021 ExactlyOnce,
1022 >::collection_kind(
1023 )),
1024 },
1025 );
1026
1027 (
1028 ExternalBytesPort {
1029 process_key: from.key,
1030 port_id: next_external_port_id,
1031 _phantom: PhantomData,
1032 },
1033 raw_stream
1034 .flatten_ordered() .into_keyed(),
1036 raw_membership_stream.map(q!(|join| {
1037 if join {
1038 MembershipEvent::Joined
1039 } else {
1040 MembershipEvent::Left
1041 }
1042 })),
1043 fwd_ref,
1044 )
1045 }
1046
1047 #[cfg(feature = "tokio")]
1063 #[expect(clippy::type_complexity, reason = "stream markers")]
1064 fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1065 &self,
1066 from: &External<L>,
1067 ) -> (
1068 ExternalBincodeBidi<InT, OutT, Many>,
1069 KeyedStream<u64, InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1070 KeyedStream<
1071 u64,
1072 MembershipEvent,
1073 Self::DropConsistency,
1074 Unbounded,
1075 TotalOrder,
1076 ExactlyOnce,
1077 >,
1078 ForwardHandle<
1079 'a,
1080 KeyedStream<u64, OutT, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
1081 >,
1082 )
1083 where
1084 Self: TopLevel<'a> + Sized,
1085 {
1086 let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1087
1088 let target_consistency = self.drop_consistency();
1089 let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1090 u64,
1091 OutT,
1092 Self::DropConsistency,
1093 Unbounded,
1094 NoOrder,
1095 ExactlyOnce,
1096 >>();
1097 let mut flow_state_borrow = self.flow_state().borrow_mut();
1098
1099 let root = get_this_crate();
1100
1101 let out_t_type = quote_type::<OutT>();
1102 let ser_fn: syn::Expr = syn::parse_quote! {
1103 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1104 |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1105 )
1106 };
1107
1108 flow_state_borrow.push_root(HydroRoot::SendExternal {
1109 to_external_key: from.key,
1110 to_port_id: next_external_port_id,
1111 to_many: true,
1112 unpaired: false,
1113 serialize_fn: Some(ser_fn.into()),
1114 instantiate_fn: DebugInstantiate::Building,
1115 input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1116 op_metadata: HydroIrOpMetadata::new(),
1117 });
1118
1119 let in_t_type = quote_type::<InT>();
1120
1121 let deser_fn: syn::Expr = syn::parse_quote! {
1122 |res| {
1123 let (id, b) = res.unwrap();
1124 (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1125 }
1126 };
1127
1128 let raw_stream: KeyedStream<
1129 u64,
1130 InT,
1131 Self::DropConsistency,
1132 Unbounded,
1133 TotalOrder,
1134 ExactlyOnce,
1135 > = KeyedStream::new(
1136 target_consistency.clone(),
1137 HydroNode::ExternalInput {
1138 from_external_key: from.key,
1139 from_port_id: next_external_port_id,
1140 from_many: true,
1141 codec_type: quote_type::<LengthDelimitedCodec>().into(),
1142 port_hint: NetworkHint::Auto,
1143 instantiate_fn: DebugInstantiate::Building,
1144 deserialize_fn: Some(deser_fn.into()),
1145 metadata: target_consistency.new_node_metadata(KeyedStream::<
1146 u64,
1147 InT,
1148 Self::DropConsistency,
1149 Unbounded,
1150 TotalOrder,
1151 ExactlyOnce,
1152 >::collection_kind(
1153 )),
1154 },
1155 );
1156
1157 let membership_stream_ident = syn::Ident::new(
1158 &format!(
1159 "__hydro_deploy_many_{}_{}_membership",
1160 from.key, next_external_port_id
1161 ),
1162 Span::call_site(),
1163 );
1164 let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1165 let raw_membership_stream: KeyedStream<
1166 u64,
1167 bool,
1168 Self::DropConsistency,
1169 Unbounded,
1170 TotalOrder,
1171 ExactlyOnce,
1172 > = KeyedStream::new(
1173 target_consistency.clone(),
1174 HydroNode::Source {
1175 source: HydroSource::Stream(membership_stream_expr.into()),
1176 metadata: target_consistency.new_node_metadata(KeyedStream::<
1177 u64,
1178 bool,
1179 Self::DropConsistency,
1180 Unbounded,
1181 TotalOrder,
1182 ExactlyOnce,
1183 >::collection_kind(
1184 )),
1185 },
1186 );
1187
1188 (
1189 ExternalBincodeBidi {
1190 process_key: from.key,
1191 port_id: next_external_port_id,
1192 _phantom: PhantomData,
1193 },
1194 raw_stream,
1195 raw_membership_stream.map(q!(|join| {
1196 if join {
1197 MembershipEvent::Joined
1198 } else {
1199 MembershipEvent::Left
1200 }
1201 })),
1202 fwd_ref,
1203 )
1204 }
1205
1206 fn sidecar_bidi<InT: 'static, OutT: 'static, F>(
1259 &self,
1260 sidecar: impl QuotedWithContext<'a, F, Self>,
1261 ) -> (
1262 Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
1263 ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1264 )
1265 where
1266 Self: Sized + TopLevel<'a>,
1267 {
1268 let location_key = Location::id(self).key();
1269
1270 let sidecar_id = self.flow_state().borrow_mut().next_sidecar_id();
1271 let (stream_ident, sink_ident) = sidecar_id.idents();
1272
1273 let sidecar_closure: syn::Expr = sidecar.splice_untyped_ctx(self);
1274 self.flow_state()
1275 .borrow_mut()
1276 .sidecars
1277 .push(crate::compile::builder::Sidecar::Bidi {
1278 location_key,
1279 sidecar_id,
1280 sidecar_closure: Box::new(sidecar_closure),
1281 });
1282
1283 let source_expr: syn::Expr = parse_quote! {
1285 #stream_ident
1286 };
1287 let inbound: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
1288 self.clone(),
1289 HydroNode::Source {
1290 source: HydroSource::Stream(source_expr.into()),
1291 metadata: self.new_node_metadata(Stream::<
1292 InT,
1293 Self,
1294 Unbounded, TotalOrder, ExactlyOnce,
1297 >::collection_kind()),
1298 },
1299 );
1300
1301 let (fwd_ref, to_sink): (
1303 ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1304 Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>,
1305 ) = self.forward_ref();
1306
1307 let sink_expr: syn::Expr = parse_quote! {
1308 #sink_ident
1309 };
1310
1311 let sink_input_ir = to_sink.ir_node.replace(HydroNode::Placeholder);
1312 self.flow_state()
1313 .borrow_mut()
1314 .try_push_root(HydroRoot::DestSink {
1315 sink: sink_expr.into(),
1316 input: Box::new(sink_input_ir),
1317 op_metadata: HydroIrOpMetadata::new(),
1318 });
1319
1320 (inbound, fwd_ref)
1321 }
1322
1323 fn singleton<T>(
1343 &self,
1344 e: impl QuotedWithContext<'a, T, Self>,
1345 ) -> Singleton<T, Self::DropConsistency, Bounded>
1346 where
1347 Self: Sized,
1348 {
1349 let e = e.splice_untyped_ctx(self);
1350
1351 let target_location = self.drop_consistency();
1352 Singleton::new(
1353 target_location.clone(),
1354 HydroNode::SingletonSource {
1355 value: e.into(),
1356 first_tick_only: false,
1357 metadata: target_location.new_node_metadata(Singleton::<
1358 T,
1359 Self::DropConsistency,
1360 Bounded,
1361 >::collection_kind()),
1362 },
1363 )
1364 }
1365
1366 fn singleton_future<F>(
1389 &self,
1390 e: impl QuotedWithContext<'a, F, Self>,
1391 ) -> Singleton<F::Output, Self::DropConsistency, Bounded>
1392 where
1393 F: Future,
1394 Self: Sized,
1395 {
1396 self.singleton(e).resolve_future_blocking()
1397 }
1398
1399 #[cfg(feature = "tokio")]
1408 fn source_interval(
1409 &self,
1410 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1411 ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1412 where
1413 Self: TopLevel<'a> + Sized,
1414 {
1415 self.source_stream(q!(tokio_stream::StreamExt::map(
1416 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
1417 |_| ()
1418 )))
1419 .assert_has_consistency_of_trusted(
1420 manual_proof!(),
1421 )
1422 }
1423
1424 #[cfg(feature = "tokio")]
1431 fn source_interval_delayed(
1432 &self,
1433 delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1434 interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1435 ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1436 where
1437 Self: TopLevel<'a> + Sized,
1438 {
1439 self.source_stream(q!(tokio_stream::StreamExt::map(
1440 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
1441 tokio::time::Instant::now() + delay,
1442 interval,
1443 )),
1444 |_| ()
1445 )))
1446 .assert_has_consistency_of_trusted(
1447 manual_proof!(),
1448 )
1449 }
1450
1451 fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1491 where
1492 S: CycleCollection<'a, ForwardRef, Location = Self>,
1493 {
1494 let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1495 (
1496 ForwardHandle::new(cycle_id, Location::id(self)),
1497 S::create_source(cycle_id, self.clone()),
1498 )
1499 }
1500}
1501
1502#[cfg(feature = "deploy")]
1503#[cfg(test)]
1504mod tests {
1505 use std::collections::HashSet;
1506
1507 use futures::{SinkExt, StreamExt};
1508 use hydro_deploy::Deployment;
1509 use stageleft::q;
1510 use tokio_util::codec::LengthDelimitedCodec;
1511
1512 use crate::compile::builder::FlowBuilder;
1513 use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1514 use crate::location::{Location, NetworkHint};
1515 use crate::nondet::nondet;
1516
1517 #[tokio::test]
1518 async fn top_level_singleton_replay_cardinality() {
1519 let mut deployment = Deployment::new();
1520
1521 let mut flow = FlowBuilder::new();
1522 let node = flow.process::<()>();
1523 let external = flow.external::<()>();
1524
1525 let (in_port, input) =
1526 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1527 let singleton = node.singleton(q!(123));
1528 let tick = node.tick();
1529 let out = input
1530 .batch(&tick, nondet!())
1531 .cross_singleton(singleton.clone().snapshot(&tick, nondet!()))
1532 .cross_singleton(
1533 singleton
1534 .snapshot(&tick, nondet!())
1535 .into_stream()
1536 .count(),
1537 )
1538 .all_ticks()
1539 .send_bincode_external(&external);
1540
1541 let nodes = flow
1542 .with_process(&node, deployment.Localhost())
1543 .with_external(&external, deployment.Localhost())
1544 .deploy(&mut deployment);
1545
1546 deployment.deploy().await.unwrap();
1547
1548 let mut external_in = nodes.connect(in_port).await;
1549 let mut external_out = nodes.connect(out).await;
1550
1551 deployment.start().await.unwrap();
1552
1553 external_in.send(1).await.unwrap();
1554 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1555
1556 external_in.send(2).await.unwrap();
1557 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1558 }
1559
1560 #[tokio::test]
1561 async fn tick_singleton_replay_cardinality() {
1562 let mut deployment = Deployment::new();
1563
1564 let mut flow = FlowBuilder::new();
1565 let node = flow.process::<()>();
1566 let external = flow.external::<()>();
1567
1568 let (in_port, input) =
1569 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1570 let tick = node.tick();
1571 let singleton = tick.singleton(q!(123));
1572 let out = input
1573 .batch(&tick, nondet!())
1574 .cross_singleton(singleton.clone())
1575 .cross_singleton(singleton.into_stream().count())
1576 .all_ticks()
1577 .send_bincode_external(&external);
1578
1579 let nodes = flow
1580 .with_process(&node, deployment.Localhost())
1581 .with_external(&external, deployment.Localhost())
1582 .deploy(&mut deployment);
1583
1584 deployment.deploy().await.unwrap();
1585
1586 let mut external_in = nodes.connect(in_port).await;
1587 let mut external_out = nodes.connect(out).await;
1588
1589 deployment.start().await.unwrap();
1590
1591 external_in.send(1).await.unwrap();
1592 assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1593
1594 external_in.send(2).await.unwrap();
1595 assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1596 }
1597
1598 #[tokio::test]
1599 async fn external_bytes() {
1600 let mut deployment = Deployment::new();
1601
1602 let mut flow = FlowBuilder::new();
1603 let first_node = flow.process::<()>();
1604 let external = flow.external::<()>();
1605
1606 let (in_port, input) = first_node.source_external_bytes(&external);
1607 let out = input.send_bincode_external(&external);
1608
1609 let nodes = flow
1610 .with_process(&first_node, deployment.Localhost())
1611 .with_external(&external, deployment.Localhost())
1612 .deploy(&mut deployment);
1613
1614 deployment.deploy().await.unwrap();
1615
1616 let mut external_in = nodes.connect(in_port).await.1;
1617 let mut external_out = nodes.connect(out).await;
1618
1619 deployment.start().await.unwrap();
1620
1621 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1622
1623 assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1624 }
1625
1626 #[tokio::test]
1627 async fn multi_external_source() {
1628 let mut deployment = Deployment::new();
1629
1630 let mut flow = FlowBuilder::new();
1631 let first_node = flow.process::<()>();
1632 let external = flow.external::<()>();
1633
1634 let (in_port, input, _membership, complete_sink) =
1635 first_node.bidi_external_many_bincode(&external);
1636 let out = input.entries().send_bincode_external(&external);
1637 complete_sink.complete(
1638 first_node
1639 .source_iter::<(u64, ()), _>(q!([]))
1640 .into_keyed()
1641 .weaken_ordering(),
1642 );
1643
1644 let nodes = flow
1645 .with_process(&first_node, deployment.Localhost())
1646 .with_external(&external, deployment.Localhost())
1647 .deploy(&mut deployment);
1648
1649 deployment.deploy().await.unwrap();
1650
1651 let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1652 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1653 let external_out = nodes.connect(out).await;
1654
1655 deployment.start().await.unwrap();
1656
1657 external_in_1.send(123).await.unwrap();
1658 external_in_2.send(456).await.unwrap();
1659
1660 assert_eq!(
1661 external_out.take(2).collect::<HashSet<_>>().await,
1662 vec![(0, 123), (1, 456)].into_iter().collect()
1663 );
1664 }
1665
1666 #[tokio::test]
1667 async fn second_connection_only_multi_source() {
1668 let mut deployment = Deployment::new();
1669
1670 let mut flow = FlowBuilder::new();
1671 let first_node = flow.process::<()>();
1672 let external = flow.external::<()>();
1673
1674 let (in_port, input, _membership, complete_sink) =
1675 first_node.bidi_external_many_bincode(&external);
1676 let out = input.entries().send_bincode_external(&external);
1677 complete_sink.complete(
1678 first_node
1679 .source_iter::<(u64, ()), _>(q!([]))
1680 .into_keyed()
1681 .weaken_ordering(),
1682 );
1683
1684 let nodes = flow
1685 .with_process(&first_node, deployment.Localhost())
1686 .with_external(&external, deployment.Localhost())
1687 .deploy(&mut deployment);
1688
1689 deployment.deploy().await.unwrap();
1690
1691 let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1693 let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1694 let mut external_out = nodes.connect(out).await;
1695
1696 deployment.start().await.unwrap();
1697
1698 external_in_2.send(456).await.unwrap();
1699
1700 assert_eq!(external_out.next().await.unwrap(), (1, 456));
1701 }
1702
1703 #[tokio::test]
1704 async fn multi_external_bytes() {
1705 let mut deployment = Deployment::new();
1706
1707 let mut flow = FlowBuilder::new();
1708 let first_node = flow.process::<()>();
1709 let external = flow.external::<()>();
1710
1711 let (in_port, input, _membership, complete_sink) = first_node
1712 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1713 let out = input.entries().send_bincode_external(&external);
1714 complete_sink.complete(
1715 first_node
1716 .source_iter(q!([]))
1717 .into_keyed()
1718 .weaken_ordering(),
1719 );
1720
1721 let nodes = flow
1722 .with_process(&first_node, deployment.Localhost())
1723 .with_external(&external, deployment.Localhost())
1724 .deploy(&mut deployment);
1725
1726 deployment.deploy().await.unwrap();
1727
1728 let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1729 let mut external_in_2 = nodes.connect(in_port).await.1;
1730 let external_out = nodes.connect(out).await;
1731
1732 deployment.start().await.unwrap();
1733
1734 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1735 external_in_2.send(vec![4, 5].into()).await.unwrap();
1736
1737 assert_eq!(
1738 external_out.take(2).collect::<HashSet<_>>().await,
1739 vec![
1740 (0, (&[1u8, 2, 3] as &[u8]).into()),
1741 (1, (&[4u8, 5] as &[u8]).into())
1742 ]
1743 .into_iter()
1744 .collect()
1745 );
1746 }
1747
1748 #[tokio::test]
1749 async fn single_client_external_bytes() {
1750 let mut deployment = Deployment::new();
1751 let mut flow = FlowBuilder::new();
1752 let first_node = flow.process::<()>();
1753 let external = flow.external::<()>();
1754 let (port, input, complete_sink) = first_node
1755 .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1756 complete_sink.complete(input.map(q!(|data| {
1757 let mut resp: Vec<u8> = data.into();
1758 resp.push(42);
1759 resp.into() })));
1761
1762 let nodes = flow
1763 .with_process(&first_node, deployment.Localhost())
1764 .with_external(&external, deployment.Localhost())
1765 .deploy(&mut deployment);
1766
1767 deployment.deploy().await.unwrap();
1768 deployment.start().await.unwrap();
1769
1770 let (mut external_out, mut external_in) = nodes.connect(port).await;
1771
1772 external_in.send(vec![1, 2, 3].into()).await.unwrap();
1773 assert_eq!(
1774 external_out.next().await.unwrap().unwrap(),
1775 vec![1, 2, 3, 42]
1776 );
1777 }
1778
1779 #[tokio::test]
1780 async fn echo_external_bytes() {
1781 let mut deployment = Deployment::new();
1782
1783 let mut flow = FlowBuilder::new();
1784 let first_node = flow.process::<()>();
1785 let external = flow.external::<()>();
1786
1787 let (port, input, _membership, complete_sink) = first_node
1788 .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1789 complete_sink
1790 .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1791
1792 let nodes = flow
1793 .with_process(&first_node, deployment.Localhost())
1794 .with_external(&external, deployment.Localhost())
1795 .deploy(&mut deployment);
1796
1797 deployment.deploy().await.unwrap();
1798
1799 let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1800 let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1801
1802 deployment.start().await.unwrap();
1803
1804 external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1805 external_in_2.send(vec![4, 5].into()).await.unwrap();
1806
1807 assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1808 assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1809 }
1810
1811 #[tokio::test]
1812 async fn echo_external_bincode() {
1813 let mut deployment = Deployment::new();
1814
1815 let mut flow = FlowBuilder::new();
1816 let first_node = flow.process::<()>();
1817 let external = flow.external::<()>();
1818
1819 let (port, input, _membership, complete_sink) =
1820 first_node.bidi_external_many_bincode(&external);
1821 complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1822
1823 let nodes = flow
1824 .with_process(&first_node, deployment.Localhost())
1825 .with_external(&external, deployment.Localhost())
1826 .deploy(&mut deployment);
1827
1828 deployment.deploy().await.unwrap();
1829
1830 let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1831 let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1832
1833 deployment.start().await.unwrap();
1834
1835 external_in_1.send("hi".to_owned()).await.unwrap();
1836 external_in_2.send("hello".to_owned()).await.unwrap();
1837
1838 assert_eq!(external_out_1.next().await.unwrap(), "HI");
1839 assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1840 }
1841
1842 #[tokio::test]
1843 async fn closure_location_name() {
1844 let mut deployment = Deployment::new();
1845 let mut flow = FlowBuilder::new();
1846
1847 enum ClosureProcess {}
1848
1849 let node = flow.process::<ClosureProcess>();
1850 let external = flow.external::<()>();
1851
1852 let (in_port, input) =
1853 node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1854 let out = input.send_bincode_external(&external);
1855
1856 let nodes = flow
1857 .with_process(&node, deployment.Localhost())
1858 .with_external(&external, deployment.Localhost())
1859 .deploy(&mut deployment);
1860
1861 deployment.deploy().await.unwrap();
1862
1863 let mut external_in = nodes.connect(in_port).await;
1864 let mut external_out = nodes.connect(out).await;
1865
1866 deployment.start().await.unwrap();
1867
1868 external_in.send(42).await.unwrap();
1869 assert_eq!(external_out.next().await.unwrap(), 42);
1870 }
1871}