Skip to main content

hydro_lang/location/
mod.rs

1//! Type definitions for distributed locations, which specify where pieces of a Hydro
2//! program will be executed.
3//!
4//! Hydro is a **global**, **distributed** programming model. This means that the data
5//! and computation in a Hydro program can be spread across multiple machines, data
6//! centers, and even continents. To achieve this, Hydro uses the concept of
7//! **locations** to keep track of _where_ data is located and computation is executed.
8//!
9//! Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
10//! which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
11//! and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
12//! to allow live collections to be _moved_ between locations via network send/receive.
13//!
14//! See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
15
16use std::fmt::Debug;
17use std::future::Future;
18#[cfg(feature = "tokio")]
19use std::marker::PhantomData;
20use std::num::ParseIntError;
21#[cfg(feature = "tokio")]
22use std::time::Duration;
23
24#[cfg(feature = "tokio")]
25use bytes::{Bytes, BytesMut};
26use futures::stream::Stream as FuturesStream;
27use proc_macro2::Span;
28use quote::quote;
29#[cfg(feature = "tokio")]
30use serde::de::DeserializeOwned;
31use serde::{Deserialize, Serialize};
32use slotmap::{Key, new_key_type};
33#[cfg(feature = "tokio")]
34use stageleft::quote_type;
35use stageleft::runtime_support::{FreeVariableWithContextWithProps, QuoteTokens};
36use stageleft::{QuotedWithContext, q};
37use syn::parse_quote;
38#[cfg(feature = "tokio")]
39use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
40
41#[cfg(feature = "tokio")]
42use crate::compile::ir::DebugInstantiate;
43use crate::compile::ir::{
44    ClusterMembersState, HydroIrOpMetadata, HydroNode, HydroRoot, HydroSource,
45};
46use crate::forward_handle::ForwardRef;
47#[cfg(stageleft_runtime)]
48use crate::forward_handle::{CycleCollection, ForwardHandle};
49use crate::live_collections::boundedness::{Bounded, Unbounded};
50use crate::live_collections::keyed_stream::KeyedStream;
51use crate::live_collections::singleton::Singleton;
52use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
53#[cfg(feature = "tokio")]
54use crate::live_collections::stream::{Ordering, Retries};
55#[cfg(stageleft_runtime)]
56use crate::location::dynamic::DynLocation;
57use crate::location::dynamic::{ClusterConsistency, LocationId};
58#[cfg(feature = "tokio")]
59use crate::location::external_process::{
60    ExternalBincodeBidi, ExternalBincodeSink, ExternalBytesPort, Many, NotMany,
61};
62use crate::nondet::NonDet;
63#[cfg(feature = "tokio")]
64use crate::properties::manual_proof;
65#[cfg(feature = "sim")]
66use crate::sim::SimSender;
67use crate::staging_util::get_this_crate;
68
69pub mod dynamic;
70
71pub mod external_process;
72pub use external_process::External;
73
74pub mod process;
75pub use process::Process;
76
77pub mod cluster;
78pub use cluster::Cluster;
79
80pub mod member_id;
81pub use member_id::{MemberId, TaglessMemberId};
82
83pub mod tick;
84pub use tick::{Atomic, Tick};
85
86/// An event indicating a change in membership status of a location in a group
87/// (e.g. a node in a [`Cluster`] or an external client connection).
88#[derive(PartialEq, Eq, Clone, Debug, Hash, Serialize, Deserialize)]
89pub enum MembershipEvent {
90    /// The member has joined the group and is now active.
91    Joined,
92    /// The member has left the group and is no longer active.
93    Left,
94}
95
96/// A hint for configuring the network transport used by an external connection.
97///
98/// This controls how the underlying TCP listener is set up when binding
99/// external client connections via methods like [`Location::bind_single_client`]
100/// or [`Location::bidi_external_many_bytes`].
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub enum NetworkHint {
103    /// Automatically select the network configuration (e.g. an ephemeral port).
104    Auto,
105    /// Use a TCP port, optionally specifying a fixed port number.
106    ///
107    /// If `None`, an available port will be chosen automatically.
108    /// If `Some(port)`, the given port number will be used.
109    TcpPort(Option<u16>),
110}
111
112pub(crate) fn check_matching_location<'a, L: Location<'a>>(l1: &L, l2: &L) {
113    assert_eq!(Location::id(l1), Location::id(l2), "locations do not match");
114}
115
116#[stageleft::export(LocationKey)]
117new_key_type! {
118    /// A unique identifier for a clock tick.
119    pub struct LocationKey;
120}
121
122impl std::fmt::Display for LocationKey {
123    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
124        write!(f, "loc{:?}", self.data()) // `"loc1v1"``
125    }
126}
127
128/// This is used for the ECS membership stream.
129/// TODO(mingwei): Make this more robust?
130impl std::str::FromStr for LocationKey {
131    type Err = Option<ParseIntError>;
132
133    fn from_str(s: &str) -> Result<Self, Self::Err> {
134        let nvn = s.strip_prefix("loc").ok_or(None)?;
135        let (idx, ver) = nvn.split_once("v").ok_or(None)?;
136        let idx: u64 = idx.parse()?;
137        let ver: u64 = ver.parse()?;
138        Ok(slotmap::KeyData::from_ffi((ver << 32) | idx).into())
139    }
140}
141
142impl LocationKey {
143    /// TODO(minwgei): Remove this and avoid magic key for simulator external.
144    /// The first location key, used by the simulator as the default external location.
145    pub const FIRST: Self = Self(slotmap::KeyData::from_ffi(0x0000000100000001)); // `1v1`
146
147    /// A key for testing with index 1.
148    #[cfg(test)]
149    pub const TEST_KEY_1: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000001)); // `1v255`
150
151    /// A key for testing with index 2.
152    #[cfg(test)]
153    pub const TEST_KEY_2: Self = Self(slotmap::KeyData::from_ffi(0x000000FF00000002)); // `2v255`
154}
155
156/// This is used within `q!` code in docker and ECS.
157impl<Ctx> FreeVariableWithContextWithProps<Ctx, ()> for LocationKey {
158    type O = LocationKey;
159
160    fn to_tokens(self, _ctx: &Ctx) -> (QuoteTokens, ())
161    where
162        Self: Sized,
163    {
164        let root = get_this_crate();
165        let n = Key::data(&self).as_ffi();
166        (
167            QuoteTokens {
168                prelude: None,
169                expr: Some(quote! {
170                    #root::location::LocationKey::from(#root::runtime_support::slotmap::KeyData::from_ffi(#n))
171                }),
172            },
173            (),
174        )
175    }
176}
177
178/// A simple enum for the type of a root location.
179#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
180pub enum LocationType {
181    /// A process (single node).
182    Process,
183    /// A cluster (multiple nodes).
184    Cluster,
185    /// An external client.
186    External,
187}
188
189/// A top-level location (i.e. a [`Process`] or [`Cluster`]) that is outside a tick / atomic region.
190pub trait TopLevel<'a>: Location<'a> {}
191
192/// A location where data can be materialized and computation can be executed.
193///
194/// Hydro is a **global**, **distributed** programming model. This means that the data
195/// and computation in a Hydro program can be spread across multiple machines, data
196/// centers, and even continents. To achieve this, Hydro uses the concept of
197/// **locations** to keep track of _where_ data is located and computation is executed.
198///
199/// Each live collection type (in [`crate::live_collections`]) has a type parameter `L`
200/// which will always be a type that implements the [`Location`] trait (e.g. [`Process`]
201/// and [`Cluster`]). To create distributed programs, Hydro provides a variety of APIs
202/// to allow live collections to be _moved_ between locations via network send/receive.
203///
204/// See [the Hydro docs](https://hydro.run/docs/hydro/reference/locations/) for more information.
205#[expect(
206    private_bounds,
207    reason = "only internal Hydro code can define location types"
208)]
209pub trait Location<'a>: DynLocation {
210    /// The root location type for this location.
211    ///
212    /// For top-level locations like [`Process`] and [`Cluster`], this is `Self`.
213    /// For nested locations like [`Tick`], this is the root location that contains it.
214    type Root: Location<'a>;
215
216    /// Location type with consistency guarantees dropped for the live collection on it.
217    type DropConsistency: Location<'a, DropConsistency = Self::DropConsistency>;
218
219    /// Returns the root location for this location.
220    ///
221    /// For top-level locations like [`Process`] and [`Cluster`], this returns `self`.
222    /// For nested locations like [`Tick`], this returns the root location that contains it.
223    fn root(&self) -> Self::Root;
224
225    /// This location but with consistency guarantees dropped for the live collection
226    fn drop_consistency(&self) -> Self::DropConsistency;
227    /// Gets the runtime enum variant for the current consistency level, if this is a cluster.
228    fn consistency() -> Option<ClusterConsistency>;
229
230    /// Updates the consistency guarantees to match that of the given location.
231    fn with_consistency_of<L2: Location<'a, DropConsistency = Self::DropConsistency>>(&self) -> L2 {
232        L2::from_drop_consistency(self.drop_consistency())
233    }
234
235    #[doc(hidden)]
236    fn from_drop_consistency(l2: Self::DropConsistency) -> Self;
237
238    /// Attempts to create a new [`Tick`] clock domain at this location.
239    ///
240    /// Returns `Some(Tick)` if this is a top-level location (like [`Process`] or [`Cluster`]),
241    /// or `None` if this location is already inside a tick (nested ticks are not supported).
242    ///
243    /// Prefer using [`Location::tick`] when you know the location is top-level.
244    fn try_tick(&self) -> Option<Tick<Self>> {
245        if Self::is_top_level() {
246            let id = self.flow_state().borrow_mut().next_clock_id();
247            Some(Tick {
248                id,
249                l: self.clone(),
250            })
251        } else {
252            None
253        }
254    }
255
256    /// Returns the unique identifier for this location.
257    fn id(&self) -> LocationId {
258        DynLocation::dyn_id(self)
259    }
260
261    /// Creates a new [`Tick`] clock domain at this location.
262    ///
263    /// A tick represents a logical clock that can be used to batch streaming data
264    /// into discrete time steps. This is useful for implementing iterative algorithms
265    /// or for synchronizing data across multiple streams.
266    ///
267    /// # Example
268    /// ```rust
269    /// # #[cfg(feature = "deploy")] {
270    /// # use hydro_lang::prelude::*;
271    /// # use futures::StreamExt;
272    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
273    /// let tick = process.tick();
274    /// let inside_tick = process
275    ///     .source_iter(q!(vec![1, 2, 3, 4]))
276    ///     .batch(&tick, nondet!(/** test */));
277    /// inside_tick.all_ticks()
278    /// # }, |mut stream| async move {
279    /// // 1, 2, 3, 4
280    /// # for w in vec![1, 2, 3, 4] {
281    /// #     assert_eq!(stream.next().await.unwrap(), w);
282    /// # }
283    /// # }));
284    /// # }
285    /// ```
286    fn tick(&self) -> Tick<Self> {
287        if let LocationId::Tick(_, _) = self.id() {
288            panic!("cannot create nested ticks");
289        }
290
291        let id = self.flow_state().borrow_mut().next_clock_id();
292        Tick {
293            id,
294            l: self.clone(),
295        }
296    }
297
298    /// Creates an unbounded stream that continuously emits unit values `()`.
299    ///
300    /// This is useful for driving computations that need to run continuously,
301    /// such as polling or heartbeat mechanisms.
302    ///
303    /// # Example
304    /// ```rust
305    /// # #[cfg(feature = "deploy")] {
306    /// # use hydro_lang::prelude::*;
307    /// # use futures::StreamExt;
308    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
309    /// let tick = process.tick();
310    /// process.spin()
311    ///     .batch(&tick, nondet!(/** test */))
312    ///     .map(q!(|_| 42))
313    ///     .all_ticks()
314    /// # }, |mut stream| async move {
315    /// // 42, 42, 42, ...
316    /// # assert_eq!(stream.next().await.unwrap(), 42);
317    /// # assert_eq!(stream.next().await.unwrap(), 42);
318    /// # assert_eq!(stream.next().await.unwrap(), 42);
319    /// # }));
320    /// # }
321    /// ```
322    fn spin(&self) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
323    where
324        Self: TopLevel<'a> + Sized,
325    {
326        Stream::new(
327            self.clone(),
328            HydroNode::Source {
329                source: HydroSource::Spin(),
330                metadata: self.new_node_metadata(Stream::<
331                    (),
332                    Self,
333                    Unbounded,
334                    TotalOrder,
335                    ExactlyOnce,
336                >::collection_kind()),
337            },
338        )
339    }
340
341    /// Creates a stream from an async [`FuturesStream`].
342    ///
343    /// This is useful for integrating with external async data sources,
344    /// such as network connections or file readers.
345    ///
346    /// # Example
347    /// ```rust
348    /// # #[cfg(feature = "deploy")] {
349    /// # use hydro_lang::prelude::*;
350    /// # use futures::StreamExt;
351    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
352    /// process.source_stream(q!(futures::stream::iter(vec![1, 2, 3])))
353    /// # }, |mut stream| async move {
354    /// // 1, 2, 3
355    /// # for w in vec![1, 2, 3] {
356    /// #     assert_eq!(stream.next().await.unwrap(), w);
357    /// # }
358    /// # }));
359    /// # }
360    /// ```
361    fn source_stream<T, E>(
362        &self,
363        e: impl QuotedWithContext<'a, E, Self>,
364    ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
365    where
366        E: FuturesStream<Item = T> + Unpin,
367        Self: TopLevel<'a> + Sized,
368    {
369        let e = e.splice_untyped_ctx(self);
370
371        let target_location = self.drop_consistency();
372        Stream::new(
373            target_location.clone(),
374            HydroNode::Source {
375                source: HydroSource::Stream(e.into()),
376                metadata: target_location.new_node_metadata(Stream::<
377                    T,
378                    Self::DropConsistency,
379                    Unbounded,
380                    TotalOrder,
381                    ExactlyOnce,
382                >::collection_kind()),
383            },
384        )
385    }
386
387    /// Creates a bounded stream from an iterator.
388    ///
389    /// The iterator is evaluated once at runtime, and all elements are emitted
390    /// in order. This is useful for creating streams from static data or
391    /// for testing.
392    ///
393    /// # Example
394    /// ```rust
395    /// # #[cfg(feature = "deploy")] {
396    /// # use hydro_lang::prelude::*;
397    /// # use futures::StreamExt;
398    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
399    /// process.source_iter(q!(vec![1, 2, 3, 4]))
400    /// # }, |mut stream| async move {
401    /// // 1, 2, 3, 4
402    /// # for w in vec![1, 2, 3, 4] {
403    /// #     assert_eq!(stream.next().await.unwrap(), w);
404    /// # }
405    /// # }));
406    /// # }
407    /// ```
408    fn source_iter<T, E>(
409        &self,
410        e: impl QuotedWithContext<'a, E, Self>,
411    ) -> Stream<T, Self::DropConsistency, Bounded, TotalOrder, ExactlyOnce>
412    where
413        E: IntoIterator<Item = T>,
414        Self: Sized,
415    {
416        let e = e.splice_typed_ctx(self);
417
418        let target_location = self.drop_consistency();
419        Stream::new(
420            target_location.clone(),
421            HydroNode::Source {
422                source: HydroSource::Iter(e.into()),
423                metadata: target_location.new_node_metadata(Stream::<
424                    T,
425                    Self::DropConsistency,
426                    Bounded,
427                    TotalOrder,
428                    ExactlyOnce,
429                >::collection_kind()),
430            },
431        )
432    }
433
434    #[deprecated(note = "use .source_cluster_membership_stream(...) instead")]
435    /// Creates a stream of membership events for a cluster.
436    ///
437    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
438    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
439    /// keyed by the [`MemberId`] of the cluster member.
440    ///
441    /// This is useful for implementing protocols that need to track cluster membership,
442    /// such as broadcasting to all members or detecting failures.
443    ///
444    /// # Non-Determinism
445    /// This stream is non-deterministic because the timing of membership events, for example
446    /// if a node leaves, the membership event may not be received if the node left before the
447    /// stream was created.
448    ///
449    /// # Example
450    /// ```rust
451    /// # #[cfg(feature = "deploy")] {
452    /// # use hydro_lang::prelude::*;
453    /// # use futures::StreamExt;
454    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
455    /// let p1 = flow.process::<()>();
456    /// let workers: Cluster<()> = flow.cluster::<()>();
457    /// # // do nothing on each worker
458    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
459    /// let cluster_members = p1.source_cluster_members(&workers, nondet!(/** late joiners may miss events */));
460    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
461    /// // if there are 4 members in the cluster, we would see a join event for each
462    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
463    /// # }, |mut stream| async move {
464    /// # let mut results = Vec::new();
465    /// # for w in 0..4 {
466    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
467    /// # }
468    /// # results.sort();
469    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
470    /// # }));
471    /// # }
472    /// ```
473    fn source_cluster_members<C: 'a>(
474        &self,
475        cluster: &Cluster<'a, C>,
476        nondet_start: NonDet,
477    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
478    where
479        Self: TopLevel<'a> + Sized,
480    {
481        self.source_cluster_membership_stream(cluster, nondet_start)
482    }
483
484    /// Creates a stream of membership events for a cluster.
485    ///
486    /// This stream emits [`MembershipEvent::Joined`] when a cluster member joins
487    /// and [`MembershipEvent::Left`] when a cluster member leaves. The stream is
488    /// keyed by the [`MemberId`] of the cluster member.
489    ///
490    /// This is useful for implementing protocols that need to track cluster membership,
491    /// such as broadcasting to all members or detecting failures.
492    ///
493    /// # Non-Determinism
494    /// This stream is non-deterministic because the timing of membership events, for example
495    /// if a node leaves, the membership event may not be received if the node left before the
496    /// stream was created.
497    ///
498    /// # Example
499    /// ```rust
500    /// # #[cfg(feature = "deploy")] {
501    /// # use hydro_lang::prelude::*;
502    /// # use futures::StreamExt;
503    /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
504    /// let p1 = flow.process::<()>();
505    /// let workers: Cluster<()> = flow.cluster::<()>();
506    /// # // do nothing on each worker
507    /// # workers.source_iter(q!(vec![])).for_each(q!(|_: ()| {}));
508    /// let cluster_members = p1.source_cluster_membership_stream(&workers, nondet!(/** late joiners may miss events */));
509    /// # cluster_members.entries().send(&p2, TCP.fail_stop().bincode())
510    /// // if there are 4 members in the cluster, we would see a join event for each
511    /// // { MemberId::<Worker>(0): [MembershipEvent::Join], MemberId::<Worker>(2): [MembershipEvent::Join], ... }
512    /// # }, |mut stream| async move {
513    /// # let mut results = Vec::new();
514    /// # for w in 0..4 {
515    /// #     results.push(format!("{:?}", stream.next().await.unwrap()));
516    /// # }
517    /// # results.sort();
518    /// # assert_eq!(results, vec!["(MemberId::<()>(0), Joined)", "(MemberId::<()>(1), Joined)", "(MemberId::<()>(2), Joined)", "(MemberId::<()>(3), Joined)"]);
519    /// # }));
520    /// # }
521    /// ```
522    fn source_cluster_membership_stream<C: 'a>(
523        &self,
524        cluster: &Cluster<'a, C>,
525        _nondet_start: NonDet,
526    ) -> KeyedStream<MemberId<C>, MembershipEvent, Self::DropConsistency, Unbounded>
527    where
528        Self: TopLevel<'a> + Sized,
529    {
530        let target_consistency = self.drop_consistency();
531        Stream::new(
532            target_consistency.clone(),
533            HydroNode::Source {
534                source: HydroSource::ClusterMembers(cluster.id(), ClusterMembersState::Uninit),
535                metadata: target_consistency.new_node_metadata(Stream::<
536                    (TaglessMemberId, MembershipEvent),
537                    Self,
538                    Unbounded,
539                    TotalOrder,
540                    ExactlyOnce,
541                >::collection_kind(
542                )),
543            },
544        )
545        .map(q!(|(k, v)| (MemberId::from_tagless(k), v)))
546        .into_keyed()
547    }
548
549    /// Creates a one-way connection from an external process to receive raw bytes.
550    ///
551    /// Returns a port handle for the external process to connect to, and a stream
552    /// of received byte buffers.
553    ///
554    /// For bidirectional communication or typed data, see [`Location::bind_single_client`]
555    /// or [`Location::source_external_bincode`].
556    #[cfg(feature = "tokio")]
557    fn source_external_bytes<L>(
558        &self,
559        from: &External<L>,
560    ) -> (
561        ExternalBytesPort,
562        Stream<BytesMut, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
563    )
564    where
565        Self: TopLevel<'a> + Sized,
566    {
567        let (port, stream, sink) =
568            self.bind_single_client::<_, Bytes, LengthDelimitedCodec>(from, NetworkHint::Auto);
569
570        sink.complete(stream.location().source_iter(q!([])));
571
572        (port, stream)
573    }
574
575    /// Creates a one-way connection from an external process to receive bincode-serialized data.
576    ///
577    /// Returns a sink handle for the external process to send data to, and a stream
578    /// of received values.
579    ///
580    /// For bidirectional communication, see [`Location::bind_single_client_bincode`].
581    #[cfg(feature = "tokio")]
582    fn source_external_bincode<L, T, O: Ordering, R: Retries>(
583        &self,
584        from: &External<L>,
585    ) -> (
586        ExternalBincodeSink<T, NotMany, O, R>,
587        Stream<T, Self::DropConsistency, Unbounded, O, R>,
588    )
589    where
590        Self: TopLevel<'a> + Sized,
591        T: Serialize + DeserializeOwned,
592    {
593        let (port, stream, sink) = self.bind_single_client_bincode::<_, T, ()>(from);
594        sink.complete(stream.location().source_iter(q!([])));
595
596        (
597            ExternalBincodeSink {
598                process_key: from.key,
599                port_id: port.port_id,
600                _phantom: PhantomData,
601            },
602            stream.weaken_ordering().weaken_retries(),
603        )
604    }
605
606    /// Sets up a simulated input port on this location for testing.
607    ///
608    /// Returns a handle to send messages to the location as well as a stream
609    /// of received messages. This is only available when the `sim` feature is enabled.
610    #[cfg(feature = "sim")]
611    fn sim_input<T, O: Ordering, R: Retries>(
612        &self,
613    ) -> (
614        SimSender<T, O, R>,
615        Stream<T, Self::DropConsistency, Unbounded, O, R>,
616    )
617    where
618        Self: TopLevel<'a> + Sized,
619        T: Serialize + DeserializeOwned,
620    {
621        let external_location: External<'a, ()> = External {
622            key: LocationKey::FIRST,
623            flow_state: self.flow_state().clone(),
624            _phantom: PhantomData,
625        };
626
627        let (external, stream) = self.source_external_bincode(&external_location);
628
629        (SimSender(external.port_id, PhantomData), stream)
630    }
631
632    /// Creates an external input stream for embedded deployment mode.
633    ///
634    /// The `name` parameter specifies the name of the generated function parameter
635    /// that will supply data to this stream at runtime. The generated function will
636    /// accept an `impl Stream<Item = T> + Unpin` argument with this name.
637    fn embedded_input<T>(
638        &self,
639        name: impl Into<String>,
640    ) -> Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>
641    where
642        Self: TopLevel<'a> + Sized,
643    {
644        let ident = syn::Ident::new(&name.into(), Span::call_site());
645
646        let target_location = self.drop_consistency();
647        Stream::new(
648            target_location.clone(),
649            HydroNode::Source {
650                source: HydroSource::Embedded(ident),
651                metadata: target_location.new_node_metadata(Stream::<
652                    T,
653                    Self,
654                    Unbounded,
655                    TotalOrder,
656                    ExactlyOnce,
657                >::collection_kind()),
658            },
659        )
660    }
661
662    /// Creates an embedded singleton input for embedded deployment mode.
663    ///
664    /// The `name` parameter specifies the name of the generated function parameter
665    /// that will supply data to this singleton at runtime. The generated function will
666    /// accept a plain `T` parameter with this name.
667    fn embedded_singleton_input<T>(
668        &self,
669        name: impl Into<String>,
670    ) -> Singleton<T, Self::DropConsistency, Bounded>
671    where
672        Self: TopLevel<'a> + Sized,
673    {
674        let ident = syn::Ident::new(&name.into(), Span::call_site());
675
676        let target_location = self.drop_consistency();
677        Singleton::new(
678            target_location.clone(),
679            HydroNode::Source {
680                source: HydroSource::EmbeddedSingleton(ident),
681                metadata: target_location
682                    .new_node_metadata(Singleton::<T, Self, Bounded>::collection_kind()),
683            },
684        )
685    }
686
687    /// Establishes a server on this location to receive a bidirectional connection from a single
688    /// client, identified by the given `External` handle. Returns a port handle for the external
689    /// process to connect to, a stream of incoming messages, and a handle to send outgoing
690    /// messages.
691    ///
692    /// # Example
693    /// ```rust
694    /// # #[cfg(feature = "deploy")] {
695    /// # use hydro_lang::prelude::*;
696    /// # use hydro_deploy::Deployment;
697    /// # use futures::{SinkExt, StreamExt};
698    /// # tokio_test::block_on(async {
699    /// # use bytes::Bytes;
700    /// # use hydro_lang::location::NetworkHint;
701    /// # use tokio_util::codec::LengthDelimitedCodec;
702    /// # let mut flow = FlowBuilder::new();
703    /// let node = flow.process::<()>();
704    /// let external = flow.external::<()>();
705    /// let (port, incoming, outgoing) =
706    ///     node.bind_single_client::<_, Bytes, LengthDelimitedCodec>(&external, NetworkHint::Auto);
707    /// outgoing.complete(incoming.map(q!(|data /* : Bytes */| {
708    ///     let mut resp: Vec<u8> = data.into();
709    ///     resp.push(42);
710    ///     resp.into() // : Bytes
711    /// })));
712    ///
713    /// # let mut deployment = Deployment::new();
714    /// let nodes = flow // ... with_process and with_external
715    /// #     .with_process(&node, deployment.Localhost())
716    /// #     .with_external(&external, deployment.Localhost())
717    /// #     .deploy(&mut deployment);
718    ///
719    /// deployment.deploy().await.unwrap();
720    /// deployment.start().await.unwrap();
721    ///
722    /// let (mut external_out, mut external_in) = nodes.connect(port).await;
723    /// external_in.send(vec![1, 2, 3].into()).await.unwrap();
724    /// assert_eq!(
725    ///     external_out.next().await.unwrap().unwrap(),
726    ///     vec![1, 2, 3, 42]
727    /// );
728    /// # });
729    /// # }
730    /// ```
731    #[cfg(feature = "tokio")]
732    #[expect(clippy::type_complexity, reason = "stream markers")]
733    fn bind_single_client<L, T, Codec: Encoder<T> + Decoder>(
734        &self,
735        from: &External<L>,
736        port_hint: NetworkHint,
737    ) -> (
738        ExternalBytesPort<NotMany>,
739        Stream<<Codec as Decoder>::Item, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
740        ForwardHandle<'a, Stream<T, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
741    )
742    where
743        Self: TopLevel<'a> + Sized,
744    {
745        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
746        let target_consistency = self.drop_consistency();
747
748        let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
749            T,
750            Self::DropConsistency,
751            Unbounded,
752            TotalOrder,
753            ExactlyOnce,
754        >>();
755        let mut flow_state_borrow = self.flow_state().borrow_mut();
756
757        flow_state_borrow.push_root(HydroRoot::SendExternal {
758            to_external_key: from.key,
759            to_port_id: next_external_port_id,
760            to_many: false,
761            unpaired: false,
762            serialize_fn: None,
763            instantiate_fn: DebugInstantiate::Building,
764            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
765            op_metadata: HydroIrOpMetadata::new(),
766        });
767
768        let raw_stream: Stream<
769            Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
770            Self::DropConsistency,
771            Unbounded,
772            TotalOrder,
773            ExactlyOnce,
774        > = Stream::new(
775            target_consistency.clone(),
776            HydroNode::ExternalInput {
777                from_external_key: from.key,
778                from_port_id: next_external_port_id,
779                from_many: false,
780                codec_type: quote_type::<Codec>().into(),
781                port_hint,
782                instantiate_fn: DebugInstantiate::Building,
783                deserialize_fn: None,
784                metadata: target_consistency.new_node_metadata(Stream::<
785                    Result<<Codec as Decoder>::Item, <Codec as Decoder>::Error>,
786                    Self::DropConsistency,
787                    Unbounded,
788                    TotalOrder,
789                    ExactlyOnce,
790                >::collection_kind(
791                )),
792            },
793        );
794
795        (
796            ExternalBytesPort {
797                process_key: from.key,
798                port_id: next_external_port_id,
799                _phantom: PhantomData,
800            },
801            raw_stream.flatten_ordered(),
802            fwd_ref,
803        )
804    }
805
806    /// Establishes a bidirectional connection from a single external client using bincode serialization.
807    ///
808    /// Returns a port handle for the external process to connect to, a stream of incoming messages,
809    /// and a handle to send outgoing messages. This is a convenience wrapper around
810    /// [`Location::bind_single_client`] that uses bincode for serialization.
811    ///
812    /// # Type Parameters
813    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
814    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
815    #[cfg(feature = "tokio")]
816    #[expect(clippy::type_complexity, reason = "stream markers")]
817    fn bind_single_client_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
818        &self,
819        from: &External<L>,
820    ) -> (
821        ExternalBincodeBidi<InT, OutT, NotMany>,
822        Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
823        ForwardHandle<'a, Stream<OutT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>>,
824    )
825    where
826        Self: TopLevel<'a> + Sized,
827    {
828        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
829
830        let target_consistency = self.drop_consistency();
831        let (fwd_ref, to_sink) = target_consistency.forward_ref::<Stream<
832            OutT,
833            Self::DropConsistency,
834            Unbounded,
835            TotalOrder,
836            ExactlyOnce,
837        >>();
838        let mut flow_state_borrow = self.flow_state().borrow_mut();
839
840        let root = get_this_crate();
841
842        let out_t_type = quote_type::<OutT>();
843        let ser_fn: syn::Expr = syn::parse_quote! {
844            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#out_t_type, _>(
845                |b| #root::runtime_support::bincode::serialize(&b).unwrap().into()
846            )
847        };
848
849        flow_state_borrow.push_root(HydroRoot::SendExternal {
850            to_external_key: from.key,
851            to_port_id: next_external_port_id,
852            to_many: false,
853            unpaired: false,
854            serialize_fn: Some(ser_fn.into()),
855            instantiate_fn: DebugInstantiate::Building,
856            input: Box::new(to_sink.ir_node.replace(HydroNode::Placeholder)),
857            op_metadata: HydroIrOpMetadata::new(),
858        });
859
860        let in_t_type = quote_type::<InT>();
861
862        let deser_fn: syn::Expr = syn::parse_quote! {
863            |res| {
864                let b = res.unwrap();
865                #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap()
866            }
867        };
868
869        let raw_stream: Stream<InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce> =
870            Stream::new(
871                target_consistency.clone(),
872                HydroNode::ExternalInput {
873                    from_external_key: from.key,
874                    from_port_id: next_external_port_id,
875                    from_many: false,
876                    codec_type: quote_type::<LengthDelimitedCodec>().into(),
877                    port_hint: NetworkHint::Auto,
878                    instantiate_fn: DebugInstantiate::Building,
879                    deserialize_fn: Some(deser_fn.into()),
880                    metadata: target_consistency.new_node_metadata(Stream::<
881                        InT,
882                        Self::DropConsistency,
883                        Unbounded,
884                        TotalOrder,
885                        ExactlyOnce,
886                    >::collection_kind(
887                    )),
888                },
889            );
890
891        (
892            ExternalBincodeBidi {
893                process_key: from.key,
894                port_id: next_external_port_id,
895                _phantom: PhantomData,
896            },
897            raw_stream,
898            fwd_ref,
899        )
900    }
901
902    /// Establishes a server on this location to receive bidirectional connections from multiple
903    /// external clients using raw bytes.
904    ///
905    /// Unlike [`Location::bind_single_client`], this method supports multiple concurrent client
906    /// connections. Each client is assigned a unique `u64` identifier.
907    ///
908    /// Returns:
909    /// - A port handle for external processes to connect to
910    /// - A keyed stream of incoming messages, keyed by client ID
911    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
912    /// - A handle to send outgoing messages, keyed by client ID
913    #[cfg(feature = "tokio")]
914    #[expect(clippy::type_complexity, reason = "stream markers")]
915    fn bidi_external_many_bytes<L, T, Codec: Encoder<T> + Decoder>(
916        &self,
917        from: &External<L>,
918        port_hint: NetworkHint,
919    ) -> (
920        ExternalBytesPort<Many>,
921        KeyedStream<
922            u64,
923            <Codec as Decoder>::Item,
924            Self::DropConsistency,
925            Unbounded,
926            TotalOrder,
927            ExactlyOnce,
928        >,
929        KeyedStream<
930            u64,
931            MembershipEvent,
932            Self::DropConsistency,
933            Unbounded,
934            TotalOrder,
935            ExactlyOnce,
936        >,
937        ForwardHandle<
938            'a,
939            KeyedStream<u64, T, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
940        >,
941    )
942    where
943        Self: TopLevel<'a> + Sized,
944    {
945        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
946
947        let target_consistency = self.drop_consistency();
948        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
949            u64,
950            T,
951            Self::DropConsistency,
952            Unbounded,
953            NoOrder,
954            ExactlyOnce,
955        >>();
956        let mut flow_state_borrow = self.flow_state().borrow_mut();
957
958        flow_state_borrow.push_root(HydroRoot::SendExternal {
959            to_external_key: from.key,
960            to_port_id: next_external_port_id,
961            to_many: true,
962            unpaired: false,
963            serialize_fn: None,
964            instantiate_fn: DebugInstantiate::Building,
965            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
966            op_metadata: HydroIrOpMetadata::new(),
967        });
968
969        let raw_stream: Stream<
970            Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
971            Self::DropConsistency,
972            Unbounded,
973            TotalOrder,
974            ExactlyOnce,
975        > = Stream::new(
976            target_consistency.clone(),
977            HydroNode::ExternalInput {
978                from_external_key: from.key,
979                from_port_id: next_external_port_id,
980                from_many: true,
981                codec_type: quote_type::<Codec>().into(),
982                port_hint,
983                instantiate_fn: DebugInstantiate::Building,
984                deserialize_fn: None,
985                metadata: target_consistency.new_node_metadata(Stream::<
986                    Result<(u64, <Codec as Decoder>::Item), <Codec as Decoder>::Error>,
987                    Self::DropConsistency,
988                    Unbounded,
989                    TotalOrder,
990                    ExactlyOnce,
991                >::collection_kind(
992                )),
993            },
994        );
995
996        let membership_stream_ident = syn::Ident::new(
997            &format!(
998                "__hydro_deploy_many_{}_{}_membership",
999                from.key, next_external_port_id
1000            ),
1001            Span::call_site(),
1002        );
1003        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1004        let raw_membership_stream: KeyedStream<
1005            u64,
1006            bool,
1007            Self::DropConsistency,
1008            Unbounded,
1009            TotalOrder,
1010            ExactlyOnce,
1011        > = KeyedStream::new(
1012            target_consistency.clone(),
1013            HydroNode::Source {
1014                source: HydroSource::Stream(membership_stream_expr.into()),
1015                metadata: target_consistency.new_node_metadata(KeyedStream::<
1016                    u64,
1017                    bool,
1018                    Self::DropConsistency,
1019                    Unbounded,
1020                    TotalOrder,
1021                    ExactlyOnce,
1022                >::collection_kind(
1023                )),
1024            },
1025        );
1026
1027        (
1028            ExternalBytesPort {
1029                process_key: from.key,
1030                port_id: next_external_port_id,
1031                _phantom: PhantomData,
1032            },
1033            raw_stream
1034                .flatten_ordered() // TODO(shadaj): this silently drops framing errors, decide on right defaults
1035                .into_keyed(),
1036            raw_membership_stream.map(q!(|join| {
1037                if join {
1038                    MembershipEvent::Joined
1039                } else {
1040                    MembershipEvent::Left
1041                }
1042            })),
1043            fwd_ref,
1044        )
1045    }
1046
1047    /// Establishes a server on this location to receive bidirectional connections from multiple
1048    /// external clients using bincode serialization.
1049    ///
1050    /// Unlike [`Location::bind_single_client_bincode`], this method supports multiple concurrent
1051    /// client connections. Each client is assigned a unique `u64` identifier.
1052    ///
1053    /// Returns:
1054    /// - A port handle for external processes to connect to
1055    /// - A keyed stream of incoming messages, keyed by client ID
1056    /// - A keyed stream of membership events (client joins/leaves), keyed by client ID
1057    /// - A handle to send outgoing messages, keyed by client ID
1058    ///
1059    /// # Type Parameters
1060    /// - `InT`: The type of incoming messages (must implement [`DeserializeOwned`])
1061    /// - `OutT`: The type of outgoing messages (must implement [`Serialize`])
1062    #[cfg(feature = "tokio")]
1063    #[expect(clippy::type_complexity, reason = "stream markers")]
1064    fn bidi_external_many_bincode<L, InT: DeserializeOwned, OutT: Serialize>(
1065        &self,
1066        from: &External<L>,
1067    ) -> (
1068        ExternalBincodeBidi<InT, OutT, Many>,
1069        KeyedStream<u64, InT, Self::DropConsistency, Unbounded, TotalOrder, ExactlyOnce>,
1070        KeyedStream<
1071            u64,
1072            MembershipEvent,
1073            Self::DropConsistency,
1074            Unbounded,
1075            TotalOrder,
1076            ExactlyOnce,
1077        >,
1078        ForwardHandle<
1079            'a,
1080            KeyedStream<u64, OutT, Self::DropConsistency, Unbounded, NoOrder, ExactlyOnce>,
1081        >,
1082    )
1083    where
1084        Self: TopLevel<'a> + Sized,
1085    {
1086        let next_external_port_id = from.flow_state.borrow_mut().next_external_port();
1087
1088        let target_consistency = self.drop_consistency();
1089        let (fwd_ref, to_sink) = target_consistency.forward_ref::<KeyedStream<
1090            u64,
1091            OutT,
1092            Self::DropConsistency,
1093            Unbounded,
1094            NoOrder,
1095            ExactlyOnce,
1096        >>();
1097        let mut flow_state_borrow = self.flow_state().borrow_mut();
1098
1099        let root = get_this_crate();
1100
1101        let out_t_type = quote_type::<OutT>();
1102        let ser_fn: syn::Expr = syn::parse_quote! {
1103            #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(u64, #out_t_type), _>(
1104                |(id, b)| (id, #root::runtime_support::bincode::serialize(&b).unwrap().into())
1105            )
1106        };
1107
1108        flow_state_borrow.push_root(HydroRoot::SendExternal {
1109            to_external_key: from.key,
1110            to_port_id: next_external_port_id,
1111            to_many: true,
1112            unpaired: false,
1113            serialize_fn: Some(ser_fn.into()),
1114            instantiate_fn: DebugInstantiate::Building,
1115            input: Box::new(to_sink.entries().ir_node.replace(HydroNode::Placeholder)),
1116            op_metadata: HydroIrOpMetadata::new(),
1117        });
1118
1119        let in_t_type = quote_type::<InT>();
1120
1121        let deser_fn: syn::Expr = syn::parse_quote! {
1122            |res| {
1123                let (id, b) = res.unwrap();
1124                (id, #root::runtime_support::bincode::deserialize::<#in_t_type>(&b).unwrap())
1125            }
1126        };
1127
1128        let raw_stream: KeyedStream<
1129            u64,
1130            InT,
1131            Self::DropConsistency,
1132            Unbounded,
1133            TotalOrder,
1134            ExactlyOnce,
1135        > = KeyedStream::new(
1136            target_consistency.clone(),
1137            HydroNode::ExternalInput {
1138                from_external_key: from.key,
1139                from_port_id: next_external_port_id,
1140                from_many: true,
1141                codec_type: quote_type::<LengthDelimitedCodec>().into(),
1142                port_hint: NetworkHint::Auto,
1143                instantiate_fn: DebugInstantiate::Building,
1144                deserialize_fn: Some(deser_fn.into()),
1145                metadata: target_consistency.new_node_metadata(KeyedStream::<
1146                    u64,
1147                    InT,
1148                    Self::DropConsistency,
1149                    Unbounded,
1150                    TotalOrder,
1151                    ExactlyOnce,
1152                >::collection_kind(
1153                )),
1154            },
1155        );
1156
1157        let membership_stream_ident = syn::Ident::new(
1158            &format!(
1159                "__hydro_deploy_many_{}_{}_membership",
1160                from.key, next_external_port_id
1161            ),
1162            Span::call_site(),
1163        );
1164        let membership_stream_expr: syn::Expr = parse_quote!(#membership_stream_ident);
1165        let raw_membership_stream: KeyedStream<
1166            u64,
1167            bool,
1168            Self::DropConsistency,
1169            Unbounded,
1170            TotalOrder,
1171            ExactlyOnce,
1172        > = KeyedStream::new(
1173            target_consistency.clone(),
1174            HydroNode::Source {
1175                source: HydroSource::Stream(membership_stream_expr.into()),
1176                metadata: target_consistency.new_node_metadata(KeyedStream::<
1177                    u64,
1178                    bool,
1179                    Self::DropConsistency,
1180                    Unbounded,
1181                    TotalOrder,
1182                    ExactlyOnce,
1183                >::collection_kind(
1184                )),
1185            },
1186        );
1187
1188        (
1189            ExternalBincodeBidi {
1190                process_key: from.key,
1191                port_id: next_external_port_id,
1192                _phantom: PhantomData,
1193            },
1194            raw_stream,
1195            raw_membership_stream.map(q!(|join| {
1196                if join {
1197                    MembershipEvent::Joined
1198                } else {
1199                    MembershipEvent::Left
1200                }
1201            })),
1202            fwd_ref,
1203        )
1204    }
1205
1206    /// Bridges user-owned async code to the dataflow as a **bidirectional sidecar**.
1207    ///
1208    /// The closure is called once at startup and must return a
1209    /// `(Stream<InT>, Sink<OutT>)` pair. The framework reads from the stream
1210    /// (items flowing *into* the dataflow) and writes to the sink (items flowing
1211    /// *out* to the sidecar). The user controls buffering, backpressure, and
1212    /// internal lifecycle — Hydro only sees the stream/sink interface.
1213    ///
1214    /// This will hopefully make it easy to integrate hydro with existing frameworks,
1215    /// for example grpc code generated service endpoints.
1216    ///
1217    /// # Returns
1218    /// - A `Stream<InT>` carrying items from the sidecar into the dataflow.
1219    /// - A [`ForwardHandle`] expecting a `Stream<OutT>` that the user completes
1220    ///   with items destined for the sidecar.
1221    ///
1222    /// # Example
1223    ///
1224    /// ```rust
1225    /// # #[cfg(feature = "deploy")] {
1226    /// # use hydro_lang::prelude::*;
1227    /// # use futures::StreamExt;
1228    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1229    /// // Sidecar that echoes whatever it receives back into the dataflow.
1230    /// let (inbound, response_handle) = process.sidecar_bidi::<String, String, _>(q!(|| {
1231    ///     let (to_df_tx, to_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1232    ///     let (from_df_tx, mut from_df_rx) = tokio::sync::mpsc::channel::<String>(16);
1233    ///
1234    ///     // Spawn the sidecar: echoes items from the dataflow back into it.
1235    ///     tokio::spawn(async move {
1236    ///         while let Some(msg) = from_df_rx.recv().await {
1237    ///             to_df_tx.send(msg).await.ok();
1238    ///         }
1239    ///     });
1240    ///
1241    ///     // Return the framework-facing ends (concrete types, no boxing needed).
1242    ///     let stream = tokio_stream::wrappers::ReceiverStream::new(to_df_rx);
1243    ///     let sink = tokio_util::sync::PollSender::new(from_df_tx);
1244    ///     (stream, sink)
1245    /// }));
1246    ///
1247    /// // Send "hello" into the sidecar via the response channel.
1248    /// let input = process.source_stream(q!(futures::stream::iter(vec!["hello".to_string()])));
1249    /// response_handle.complete(input);
1250    ///
1251    /// // The sidecar echoes it back — assert we get "hello" out.
1252    /// inbound
1253    /// # }, |mut stream| async move {
1254    /// #     assert_eq!(stream.next().await.unwrap(), "hello");
1255    /// # }));
1256    /// # }
1257    /// ```
1258    fn sidecar_bidi<InT: 'static, OutT: 'static, F>(
1259        &self,
1260        sidecar: impl QuotedWithContext<'a, F, Self>,
1261    ) -> (
1262        Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce>,
1263        ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1264    )
1265    where
1266        Self: Sized + TopLevel<'a>,
1267    {
1268        let location_key = Location::id(self).key();
1269
1270        let sidecar_id = self.flow_state().borrow_mut().next_sidecar_id();
1271        let (stream_ident, sink_ident) = sidecar_id.idents();
1272
1273        let sidecar_closure: syn::Expr = sidecar.splice_untyped_ctx(self);
1274        self.flow_state()
1275            .borrow_mut()
1276            .sidecars
1277            .push(crate::compile::builder::Sidecar::Bidi {
1278                location_key,
1279                sidecar_id,
1280                sidecar_closure: Box::new(sidecar_closure),
1281            });
1282
1283        // Inbound stream: reads from the stream returned by the sidecar closure
1284        let source_expr: syn::Expr = parse_quote! {
1285            #stream_ident
1286        };
1287        let inbound: Stream<InT, Self, Unbounded, TotalOrder, ExactlyOnce> = Stream::new(
1288            self.clone(),
1289            HydroNode::Source {
1290                source: HydroSource::Stream(source_expr.into()),
1291                metadata: self.new_node_metadata(Stream::<
1292                    InT,
1293                    Self,
1294                    Unbounded,  // TODO: maybe bounded sidecars are interesting..?
1295                    TotalOrder, // TODO: NoOrder..?
1296                    ExactlyOnce,
1297                >::collection_kind()),
1298            },
1299        );
1300
1301        // Outbound: forward_ref cycle feeding the sink returned by the sidecar closure
1302        let (fwd_ref, to_sink): (
1303            ForwardHandle<'a, Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>>,
1304            Stream<OutT, Self, Unbounded, NoOrder, ExactlyOnce>,
1305        ) = self.forward_ref();
1306
1307        let sink_expr: syn::Expr = parse_quote! {
1308            #sink_ident
1309        };
1310
1311        let sink_input_ir = to_sink.ir_node.replace(HydroNode::Placeholder);
1312        self.flow_state()
1313            .borrow_mut()
1314            .try_push_root(HydroRoot::DestSink {
1315                sink: sink_expr.into(),
1316                input: Box::new(sink_input_ir),
1317                op_metadata: HydroIrOpMetadata::new(),
1318            });
1319
1320        (inbound, fwd_ref)
1321    }
1322
1323    /// Constructs a [`Singleton`] materialized at this location with the given static value.
1324    ///
1325    /// See also: [`Tick::singleton`], for creating a singleton _within_ a tick, which requires
1326    /// `T: Clone`.
1327    ///
1328    /// # Example
1329    /// ```rust
1330    /// # #[cfg(feature = "deploy")] {
1331    /// # use hydro_lang::prelude::*;
1332    /// # use futures::StreamExt;
1333    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1334    /// let singleton = process.singleton(q!(5));
1335    /// # singleton.into_stream()
1336    /// # }, |mut stream| async move {
1337    /// // 5
1338    /// # assert_eq!(stream.next().await.unwrap(), 5);
1339    /// # }));
1340    /// # }
1341    /// ```
1342    fn singleton<T>(
1343        &self,
1344        e: impl QuotedWithContext<'a, T, Self>,
1345    ) -> Singleton<T, Self::DropConsistency, Bounded>
1346    where
1347        Self: Sized,
1348    {
1349        let e = e.splice_untyped_ctx(self);
1350
1351        let target_location = self.drop_consistency();
1352        Singleton::new(
1353            target_location.clone(),
1354            HydroNode::SingletonSource {
1355                value: e.into(),
1356                first_tick_only: false,
1357                metadata: target_location.new_node_metadata(Singleton::<
1358                    T,
1359                    Self::DropConsistency,
1360                    Bounded,
1361                >::collection_kind()),
1362            },
1363        )
1364    }
1365
1366    /// Constructs a [`Singleton`] by resolving an async [`Future`] to completion.
1367    ///
1368    /// This is a convenience method equivalent to
1369    /// `self.singleton(future_expr).resolve_future_blocking()`, which is a common
1370    /// pattern when initializing a singleton from an async computation.
1371    ///
1372    /// # Example
1373    /// ```rust
1374    /// # #[cfg(feature = "deploy")] {
1375    /// # use hydro_lang::prelude::*;
1376    /// # use futures::StreamExt;
1377    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1378    /// let singleton = process.singleton_future(q!(async { 42 }));
1379    /// singleton.into_stream()
1380    /// # }, |mut stream| async move {
1381    /// // 42
1382    /// # assert_eq!(stream.next().await.unwrap(), 42);
1383    /// # }));
1384    /// # }
1385    /// ```
1386    ///
1387    /// [`Future`]: std::future::Future
1388    fn singleton_future<F>(
1389        &self,
1390        e: impl QuotedWithContext<'a, F, Self>,
1391    ) -> Singleton<F::Output, Self::DropConsistency, Bounded>
1392    where
1393        F: Future,
1394        Self: Sized,
1395    {
1396        self.singleton(e).resolve_future_blocking()
1397    }
1398
1399    /// Generates a stream that emits `()` at a fixed interval.
1400    ///
1401    /// The first tick completes immediately. Missed ticks will be scheduled
1402    /// as soon as possible.
1403    ///
1404    /// Because this only emits `()`, the non-determinism of *when* events fire
1405    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1406    /// [`NonDet`] guard is required.
1407    #[cfg(feature = "tokio")]
1408    fn source_interval(
1409        &self,
1410        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1411    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1412    where
1413        Self: TopLevel<'a> + Sized,
1414    {
1415        self.source_stream(q!(tokio_stream::StreamExt::map(
1416            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
1417            |_| ()
1418        )))
1419        .assert_has_consistency_of_trusted(
1420            manual_proof!(/** interval does not reveal timestamps */),
1421        )
1422    }
1423
1424    /// Generates a stream that emits `()` at a fixed interval, after an
1425    /// initial delay.
1426    ///
1427    /// Because this only emits `()`, the non-determinism of *when* events fire
1428    /// is captured by the `AtLeastOnce` retry semantics downstream, so no
1429    /// [`NonDet`] guard is required.
1430    #[cfg(feature = "tokio")]
1431    fn source_interval_delayed(
1432        &self,
1433        delay: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1434        interval: impl QuotedWithContext<'a, Duration, Self> + Copy + 'a,
1435    ) -> Stream<(), Self, Unbounded, TotalOrder, ExactlyOnce>
1436    where
1437        Self: TopLevel<'a> + Sized,
1438    {
1439        self.source_stream(q!(tokio_stream::StreamExt::map(
1440            tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
1441                tokio::time::Instant::now() + delay,
1442                interval,
1443            )),
1444            |_| ()
1445        )))
1446        .assert_has_consistency_of_trusted(
1447            manual_proof!(/** interval does not reveal timestamps */),
1448        )
1449    }
1450
1451    /// Creates a forward reference, allowing a stream to be used before its source is defined.
1452    ///
1453    /// Returns a `(handle, placeholder)` pair. Use the placeholder in the dataflow graph,
1454    /// then call `handle.complete(actual_stream)` to wire in the real source.
1455    ///
1456    /// This is useful for mutually-dependent dataflows or when the definition order
1457    /// doesn't match the data flow direction. For feedback loops, prefer [`Tick::cycle`]
1458    /// instead, which automatically defers values by one tick.
1459    ///
1460    /// # Panics
1461    /// Panics if the forward reference creates a synchronous cycle (i.e., the completed
1462    /// stream transitively depends on the placeholder without a `defer_tick` or network
1463    /// hop in between).
1464    ///
1465    /// # Example
1466    /// ```rust
1467    /// # #[cfg(feature = "deploy")] {
1468    /// # use hydro_lang::prelude::*;
1469    /// # use hydro_lang::live_collections::stream::NoOrder;
1470    /// # use futures::StreamExt;
1471    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1472    /// // Create a forward reference to define a stream that will be completed later
1473    /// let (complete, forward_stream) = process.forward_ref::<Stream<i32, _, _, NoOrder>>();
1474    ///
1475    /// // Use the forward reference as input to another computation
1476    /// let output: Stream<_, _, _, NoOrder> = forward_stream.map(q!(|x| x * 2));
1477    ///
1478    /// // Complete the forward reference with the actual source
1479    /// let source: Stream<_, _, Unbounded> = process.source_iter(q!([1, 2, 3])).into();
1480    /// complete.complete(source);
1481    /// output
1482    /// # }, |mut stream| async move {
1483    /// // 2, 4, 6
1484    /// # assert_eq!(stream.next().await.unwrap(), 2);
1485    /// # assert_eq!(stream.next().await.unwrap(), 4);
1486    /// # assert_eq!(stream.next().await.unwrap(), 6);
1487    /// # }));
1488    /// # }
1489    /// ```
1490    fn forward_ref<S>(&self) -> (ForwardHandle<'a, S>, S)
1491    where
1492        S: CycleCollection<'a, ForwardRef, Location = Self>,
1493    {
1494        let cycle_id = self.flow_state().borrow_mut().next_cycle_id();
1495        (
1496            ForwardHandle::new(cycle_id, Location::id(self)),
1497            S::create_source(cycle_id, self.clone()),
1498        )
1499    }
1500}
1501
1502#[cfg(feature = "deploy")]
1503#[cfg(test)]
1504mod tests {
1505    use std::collections::HashSet;
1506
1507    use futures::{SinkExt, StreamExt};
1508    use hydro_deploy::Deployment;
1509    use stageleft::q;
1510    use tokio_util::codec::LengthDelimitedCodec;
1511
1512    use crate::compile::builder::FlowBuilder;
1513    use crate::live_collections::stream::{ExactlyOnce, TotalOrder};
1514    use crate::location::{Location, NetworkHint};
1515    use crate::nondet::nondet;
1516
1517    #[tokio::test]
1518    async fn top_level_singleton_replay_cardinality() {
1519        let mut deployment = Deployment::new();
1520
1521        let mut flow = FlowBuilder::new();
1522        let node = flow.process::<()>();
1523        let external = flow.external::<()>();
1524
1525        let (in_port, input) =
1526            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1527        let singleton = node.singleton(q!(123));
1528        let tick = node.tick();
1529        let out = input
1530            .batch(&tick, nondet!(/** test */))
1531            .cross_singleton(singleton.clone().snapshot(&tick, nondet!(/** test */)))
1532            .cross_singleton(
1533                singleton
1534                    .snapshot(&tick, nondet!(/** test */))
1535                    .into_stream()
1536                    .count(),
1537            )
1538            .all_ticks()
1539            .send_bincode_external(&external);
1540
1541        let nodes = flow
1542            .with_process(&node, deployment.Localhost())
1543            .with_external(&external, deployment.Localhost())
1544            .deploy(&mut deployment);
1545
1546        deployment.deploy().await.unwrap();
1547
1548        let mut external_in = nodes.connect(in_port).await;
1549        let mut external_out = nodes.connect(out).await;
1550
1551        deployment.start().await.unwrap();
1552
1553        external_in.send(1).await.unwrap();
1554        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1555
1556        external_in.send(2).await.unwrap();
1557        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1558    }
1559
1560    #[tokio::test]
1561    async fn tick_singleton_replay_cardinality() {
1562        let mut deployment = Deployment::new();
1563
1564        let mut flow = FlowBuilder::new();
1565        let node = flow.process::<()>();
1566        let external = flow.external::<()>();
1567
1568        let (in_port, input) =
1569            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
1570        let tick = node.tick();
1571        let singleton = tick.singleton(q!(123));
1572        let out = input
1573            .batch(&tick, nondet!(/** test */))
1574            .cross_singleton(singleton.clone())
1575            .cross_singleton(singleton.into_stream().count())
1576            .all_ticks()
1577            .send_bincode_external(&external);
1578
1579        let nodes = flow
1580            .with_process(&node, deployment.Localhost())
1581            .with_external(&external, deployment.Localhost())
1582            .deploy(&mut deployment);
1583
1584        deployment.deploy().await.unwrap();
1585
1586        let mut external_in = nodes.connect(in_port).await;
1587        let mut external_out = nodes.connect(out).await;
1588
1589        deployment.start().await.unwrap();
1590
1591        external_in.send(1).await.unwrap();
1592        assert_eq!(external_out.next().await.unwrap(), ((1, 123), 1));
1593
1594        external_in.send(2).await.unwrap();
1595        assert_eq!(external_out.next().await.unwrap(), ((2, 123), 1));
1596    }
1597
1598    #[tokio::test]
1599    async fn external_bytes() {
1600        let mut deployment = Deployment::new();
1601
1602        let mut flow = FlowBuilder::new();
1603        let first_node = flow.process::<()>();
1604        let external = flow.external::<()>();
1605
1606        let (in_port, input) = first_node.source_external_bytes(&external);
1607        let out = input.send_bincode_external(&external);
1608
1609        let nodes = flow
1610            .with_process(&first_node, deployment.Localhost())
1611            .with_external(&external, deployment.Localhost())
1612            .deploy(&mut deployment);
1613
1614        deployment.deploy().await.unwrap();
1615
1616        let mut external_in = nodes.connect(in_port).await.1;
1617        let mut external_out = nodes.connect(out).await;
1618
1619        deployment.start().await.unwrap();
1620
1621        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1622
1623        assert_eq!(external_out.next().await.unwrap(), vec![1, 2, 3]);
1624    }
1625
1626    #[tokio::test]
1627    async fn multi_external_source() {
1628        let mut deployment = Deployment::new();
1629
1630        let mut flow = FlowBuilder::new();
1631        let first_node = flow.process::<()>();
1632        let external = flow.external::<()>();
1633
1634        let (in_port, input, _membership, complete_sink) =
1635            first_node.bidi_external_many_bincode(&external);
1636        let out = input.entries().send_bincode_external(&external);
1637        complete_sink.complete(
1638            first_node
1639                .source_iter::<(u64, ()), _>(q!([]))
1640                .into_keyed()
1641                .weaken_ordering(),
1642        );
1643
1644        let nodes = flow
1645            .with_process(&first_node, deployment.Localhost())
1646            .with_external(&external, deployment.Localhost())
1647            .deploy(&mut deployment);
1648
1649        deployment.deploy().await.unwrap();
1650
1651        let (_, mut external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1652        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1653        let external_out = nodes.connect(out).await;
1654
1655        deployment.start().await.unwrap();
1656
1657        external_in_1.send(123).await.unwrap();
1658        external_in_2.send(456).await.unwrap();
1659
1660        assert_eq!(
1661            external_out.take(2).collect::<HashSet<_>>().await,
1662            vec![(0, 123), (1, 456)].into_iter().collect()
1663        );
1664    }
1665
1666    #[tokio::test]
1667    async fn second_connection_only_multi_source() {
1668        let mut deployment = Deployment::new();
1669
1670        let mut flow = FlowBuilder::new();
1671        let first_node = flow.process::<()>();
1672        let external = flow.external::<()>();
1673
1674        let (in_port, input, _membership, complete_sink) =
1675            first_node.bidi_external_many_bincode(&external);
1676        let out = input.entries().send_bincode_external(&external);
1677        complete_sink.complete(
1678            first_node
1679                .source_iter::<(u64, ()), _>(q!([]))
1680                .into_keyed()
1681                .weaken_ordering(),
1682        );
1683
1684        let nodes = flow
1685            .with_process(&first_node, deployment.Localhost())
1686            .with_external(&external, deployment.Localhost())
1687            .deploy(&mut deployment);
1688
1689        deployment.deploy().await.unwrap();
1690
1691        // intentionally skipped to test stream waking logic
1692        let (_, mut _external_in_1) = nodes.connect_bincode(in_port.clone()).await;
1693        let (_, mut external_in_2) = nodes.connect_bincode(in_port).await;
1694        let mut external_out = nodes.connect(out).await;
1695
1696        deployment.start().await.unwrap();
1697
1698        external_in_2.send(456).await.unwrap();
1699
1700        assert_eq!(external_out.next().await.unwrap(), (1, 456));
1701    }
1702
1703    #[tokio::test]
1704    async fn multi_external_bytes() {
1705        let mut deployment = Deployment::new();
1706
1707        let mut flow = FlowBuilder::new();
1708        let first_node = flow.process::<()>();
1709        let external = flow.external::<()>();
1710
1711        let (in_port, input, _membership, complete_sink) = first_node
1712            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1713        let out = input.entries().send_bincode_external(&external);
1714        complete_sink.complete(
1715            first_node
1716                .source_iter(q!([]))
1717                .into_keyed()
1718                .weaken_ordering(),
1719        );
1720
1721        let nodes = flow
1722            .with_process(&first_node, deployment.Localhost())
1723            .with_external(&external, deployment.Localhost())
1724            .deploy(&mut deployment);
1725
1726        deployment.deploy().await.unwrap();
1727
1728        let mut external_in_1 = nodes.connect(in_port.clone()).await.1;
1729        let mut external_in_2 = nodes.connect(in_port).await.1;
1730        let external_out = nodes.connect(out).await;
1731
1732        deployment.start().await.unwrap();
1733
1734        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1735        external_in_2.send(vec![4, 5].into()).await.unwrap();
1736
1737        assert_eq!(
1738            external_out.take(2).collect::<HashSet<_>>().await,
1739            vec![
1740                (0, (&[1u8, 2, 3] as &[u8]).into()),
1741                (1, (&[4u8, 5] as &[u8]).into())
1742            ]
1743            .into_iter()
1744            .collect()
1745        );
1746    }
1747
1748    #[tokio::test]
1749    async fn single_client_external_bytes() {
1750        let mut deployment = Deployment::new();
1751        let mut flow = FlowBuilder::new();
1752        let first_node = flow.process::<()>();
1753        let external = flow.external::<()>();
1754        let (port, input, complete_sink) = first_node
1755            .bind_single_client::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1756        complete_sink.complete(input.map(q!(|data| {
1757            let mut resp: Vec<u8> = data.into();
1758            resp.push(42);
1759            resp.into() // : Bytes
1760        })));
1761
1762        let nodes = flow
1763            .with_process(&first_node, deployment.Localhost())
1764            .with_external(&external, deployment.Localhost())
1765            .deploy(&mut deployment);
1766
1767        deployment.deploy().await.unwrap();
1768        deployment.start().await.unwrap();
1769
1770        let (mut external_out, mut external_in) = nodes.connect(port).await;
1771
1772        external_in.send(vec![1, 2, 3].into()).await.unwrap();
1773        assert_eq!(
1774            external_out.next().await.unwrap().unwrap(),
1775            vec![1, 2, 3, 42]
1776        );
1777    }
1778
1779    #[tokio::test]
1780    async fn echo_external_bytes() {
1781        let mut deployment = Deployment::new();
1782
1783        let mut flow = FlowBuilder::new();
1784        let first_node = flow.process::<()>();
1785        let external = flow.external::<()>();
1786
1787        let (port, input, _membership, complete_sink) = first_node
1788            .bidi_external_many_bytes::<_, _, LengthDelimitedCodec>(&external, NetworkHint::Auto);
1789        complete_sink
1790            .complete(input.map(q!(|bytes| { bytes.into_iter().map(|x| x + 1).collect() })));
1791
1792        let nodes = flow
1793            .with_process(&first_node, deployment.Localhost())
1794            .with_external(&external, deployment.Localhost())
1795            .deploy(&mut deployment);
1796
1797        deployment.deploy().await.unwrap();
1798
1799        let (mut external_out_1, mut external_in_1) = nodes.connect(port.clone()).await;
1800        let (mut external_out_2, mut external_in_2) = nodes.connect(port).await;
1801
1802        deployment.start().await.unwrap();
1803
1804        external_in_1.send(vec![1, 2, 3].into()).await.unwrap();
1805        external_in_2.send(vec![4, 5].into()).await.unwrap();
1806
1807        assert_eq!(external_out_1.next().await.unwrap().unwrap(), vec![2, 3, 4]);
1808        assert_eq!(external_out_2.next().await.unwrap().unwrap(), vec![5, 6]);
1809    }
1810
1811    #[tokio::test]
1812    async fn echo_external_bincode() {
1813        let mut deployment = Deployment::new();
1814
1815        let mut flow = FlowBuilder::new();
1816        let first_node = flow.process::<()>();
1817        let external = flow.external::<()>();
1818
1819        let (port, input, _membership, complete_sink) =
1820            first_node.bidi_external_many_bincode(&external);
1821        complete_sink.complete(input.map(q!(|text: String| { text.to_uppercase() })));
1822
1823        let nodes = flow
1824            .with_process(&first_node, deployment.Localhost())
1825            .with_external(&external, deployment.Localhost())
1826            .deploy(&mut deployment);
1827
1828        deployment.deploy().await.unwrap();
1829
1830        let (mut external_out_1, mut external_in_1) = nodes.connect_bincode(port.clone()).await;
1831        let (mut external_out_2, mut external_in_2) = nodes.connect_bincode(port).await;
1832
1833        deployment.start().await.unwrap();
1834
1835        external_in_1.send("hi".to_owned()).await.unwrap();
1836        external_in_2.send("hello".to_owned()).await.unwrap();
1837
1838        assert_eq!(external_out_1.next().await.unwrap(), "HI");
1839        assert_eq!(external_out_2.next().await.unwrap(), "HELLO");
1840    }
1841
1842    #[tokio::test]
1843    async fn closure_location_name() {
1844        let mut deployment = Deployment::new();
1845        let mut flow = FlowBuilder::new();
1846
1847        enum ClosureProcess {}
1848
1849        let node = flow.process::<ClosureProcess>();
1850        let external = flow.external::<()>();
1851
1852        let (in_port, input) =
1853            node.source_external_bincode::<_, i32, TotalOrder, ExactlyOnce>(&external);
1854        let out = input.send_bincode_external(&external);
1855
1856        let nodes = flow
1857            .with_process(&node, deployment.Localhost())
1858            .with_external(&external, deployment.Localhost())
1859            .deploy(&mut deployment);
1860
1861        deployment.deploy().await.unwrap();
1862
1863        let mut external_in = nodes.connect(in_port).await;
1864        let mut external_out = nodes.connect(out).await;
1865
1866        deployment.start().await.unwrap();
1867
1868        external_in.send(42).await.unwrap();
1869        assert_eq!(external_out.next().await.unwrap(), 42);
1870    }
1871}