Skip to main content

hydro_lang/live_collections/stream/
mod.rs

1//! Definitions for the [`Stream`] live collection.
2
3use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11#[cfg(feature = "tokio")]
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
15use super::keyed_singleton::KeyedSingleton;
16use super::keyed_stream::{Generate, KeyedStream};
17use super::optional::Optional;
18use super::singleton::Singleton;
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::batch_atomic::BatchAtomic;
27use crate::live_collections::singleton::SingletonBound;
28#[cfg(stageleft_runtime)]
29use crate::location::dynamic::{DynLocation, LocationId};
30use crate::location::tick::{Atomic, DeferTick};
31use crate::location::{Location, Tick, TopLevel, check_matching_location};
32use crate::manual_expr::ManualExpr;
33use crate::nondet::{NonDet, nondet};
34use crate::prelude::manual_proof;
35use crate::properties::{
36    AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
37    ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
38    ValidMutCommutativityFor, ValidMutIdempotenceFor,
39};
40
41pub mod networking;
42
43/// A trait implemented by valid ordering markers ([`TotalOrder`] and [`NoOrder`]).
44#[sealed::sealed]
45pub trait Ordering:
46    MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
47{
48    /// The [`StreamOrder`] corresponding to this type.
49    const ORDERING_KIND: StreamOrder;
50}
51
52/// Marks the stream as being totally ordered, which means that there are
53/// no sources of non-determinism (other than intentional ones) that will
54/// affect the order of elements.
55pub enum TotalOrder {}
56
57#[sealed::sealed]
58impl Ordering for TotalOrder {
59    const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
60}
61
62/// Marks the stream as having no order, which means that the order of
63/// elements may be affected by non-determinism.
64///
65/// This restricts certain operators, such as `fold` and `reduce`, to only
66/// be used with commutative aggregation functions.
67pub enum NoOrder {}
68
69#[sealed::sealed]
70impl Ordering for NoOrder {
71    const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
72}
73
74/// Marker trait for an [`Ordering`] that is available when `Self` is a weaker guarantee than
75/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
76/// have `Self` guarantees instead.
77#[sealed::sealed]
78pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
79#[sealed::sealed]
80impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
81
82/// Helper trait for determining the weakest of two orderings.
83#[sealed::sealed]
84pub trait MinOrder<Other: ?Sized> {
85    /// The weaker of the two orderings.
86    type Min: Ordering;
87}
88
89#[sealed::sealed]
90impl<O: Ordering> MinOrder<O> for TotalOrder {
91    type Min = O;
92}
93
94#[sealed::sealed]
95impl<O: Ordering> MinOrder<O> for NoOrder {
96    type Min = NoOrder;
97}
98
99/// A trait implemented by valid retries markers ([`ExactlyOnce`] and [`AtLeastOnce`]).
100#[sealed::sealed]
101pub trait Retries:
102    MinRetries<Self, Min = Self>
103    + MinRetries<ExactlyOnce, Min = Self>
104    + MinRetries<AtLeastOnce, Min = AtLeastOnce>
105{
106    /// The [`StreamRetry`] corresponding to this type.
107    const RETRIES_KIND: StreamRetry;
108}
109
110/// Marks the stream as having deterministic message cardinality, with no
111/// possibility of duplicates.
112pub enum ExactlyOnce {}
113
114#[sealed::sealed]
115impl Retries for ExactlyOnce {
116    const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
117}
118
119/// Marks the stream as having non-deterministic message cardinality, which
120/// means that duplicates may occur, but messages will not be dropped.
121pub enum AtLeastOnce {}
122
123#[sealed::sealed]
124impl Retries for AtLeastOnce {
125    const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
126}
127
128/// Marker trait for a [`Retries`] that is available when `Self` is a weaker guarantee than
129/// `Other`, which means that a stream with `Other` guarantees can be safely converted to
130/// have `Self` guarantees instead.
131#[sealed::sealed]
132pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
133#[sealed::sealed]
134impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
135
136/// Helper trait for determining the weakest of two retry guarantees.
137#[sealed::sealed]
138pub trait MinRetries<Other: ?Sized> {
139    /// The weaker of the two retry guarantees.
140    type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
141}
142
143#[sealed::sealed]
144impl<R: Retries> MinRetries<R> for ExactlyOnce {
145    type Min = R;
146}
147
148#[sealed::sealed]
149impl<R: Retries> MinRetries<R> for AtLeastOnce {
150    type Min = AtLeastOnce;
151}
152
153#[sealed::sealed]
154#[diagnostic::on_unimplemented(
155    message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
156    label = "required here",
157    note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
158)]
159/// Marker trait that is implemented for the [`TotalOrder`] ordering guarantee.
160pub trait IsOrdered: Ordering {}
161
162#[sealed::sealed]
163#[diagnostic::do_not_recommend]
164impl IsOrdered for TotalOrder {}
165
166#[sealed::sealed]
167#[diagnostic::on_unimplemented(
168    message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
169    label = "required here",
170    note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
171)]
172/// Marker trait that is implemented for the [`ExactlyOnce`] retries guarantee.
173pub trait IsExactlyOnce: Retries {}
174
175#[sealed::sealed]
176#[diagnostic::do_not_recommend]
177impl IsExactlyOnce for ExactlyOnce {}
178
179/// Streaming sequence of elements with type `Type`.
180///
181/// This live collection represents a growing sequence of elements, with new elements being
182/// asynchronously appended to the end of the sequence. This can be used to model the arrival
183/// of network input, such as API requests, or streaming ingestion.
184///
185/// By default, all streams have deterministic ordering and each element is materialized exactly
186/// once. But streams can also capture non-determinism via the `Order` and `Retries` type
187/// parameters. When the ordering / retries guarantee is relaxed, fewer APIs will be available
188/// on the stream. For example, if the stream is unordered, you cannot invoke [`Stream::first`].
189///
190/// Type Parameters:
191/// - `Type`: the type of elements in the stream
192/// - `Loc`: the location where the stream is being materialized
193/// - `Bound`: the boundedness of the stream, which is either [`Bounded`] or [`Unbounded`]
194/// - `Order`: the ordering of the stream, which is either [`TotalOrder`] or [`NoOrder`]
195///   (default is [`TotalOrder`])
196/// - `Retries`: the retry guarantee of the stream, which is either [`ExactlyOnce`] or
197///   [`AtLeastOnce`] (default is [`ExactlyOnce`])
198pub struct Stream<
199    Type,
200    Loc,
201    Bound: Boundedness = Unbounded,
202    Order: Ordering = TotalOrder,
203    Retry: Retries = ExactlyOnce,
204> {
205    pub(crate) location: Loc,
206    pub(crate) ir_node: RefCell<HydroNode>,
207    pub(crate) flow_state: FlowState,
208
209    _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
210}
211
212impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
213    fn drop(&mut self) {
214        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
215        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
216            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
217                input: Box::new(ir_node),
218                op_metadata: HydroIrOpMetadata::new(),
219            });
220        }
221    }
222}
223
224impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
225    for Stream<T, L, Unbounded, O, R>
226where
227    L: Location<'a>,
228{
229    fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
230        let new_meta = stream
231            .location
232            .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
233
234        Stream {
235            location: stream.location.clone(),
236            flow_state: stream.flow_state.clone(),
237            ir_node: RefCell::new(HydroNode::Cast {
238                inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
239                metadata: new_meta,
240            }),
241            _phantom: PhantomData,
242        }
243    }
244}
245
246impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
247    for Stream<T, L, B, NoOrder, R>
248where
249    L: Location<'a>,
250{
251    fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
252        stream.weaken_ordering()
253    }
254}
255
256impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
257    for Stream<T, L, B, O, AtLeastOnce>
258where
259    L: Location<'a>,
260{
261    fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
262        stream.weaken_retries()
263    }
264}
265
266impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
267where
268    L: Location<'a>,
269{
270    fn defer_tick(self) -> Self {
271        Stream::defer_tick(self)
272    }
273}
274
275impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
276    for Stream<T, Tick<L>, Bounded, O, R>
277where
278    L: Location<'a>,
279{
280    type Location = Tick<L>;
281
282    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
283        Stream::new(
284            location.clone(),
285            HydroNode::CycleSource {
286                cycle_id,
287                metadata: location.new_node_metadata(Self::collection_kind()),
288            },
289        )
290    }
291}
292
293impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
294    for Stream<T, Tick<L>, Bounded, O, R>
295where
296    L: Location<'a>,
297{
298    type Location = Tick<L>;
299
300    fn location(&self) -> &Self::Location {
301        self.location()
302    }
303
304    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
305        let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
306            location.clone(),
307            HydroNode::DeferTick {
308                input: Box::new(HydroNode::CycleSource {
309                    cycle_id,
310                    metadata: location.new_node_metadata(Self::collection_kind()),
311                }),
312                metadata: location.new_node_metadata(Self::collection_kind()),
313            },
314        );
315
316        from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
317    }
318}
319
320impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
321    for Stream<T, Tick<L>, Bounded, O, R>
322where
323    L: Location<'a>,
324{
325    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
326        assert_eq!(
327            Location::id(&self.location),
328            expected_location,
329            "locations do not match"
330        );
331        self.location
332            .flow_state()
333            .borrow_mut()
334            .push_root(HydroRoot::CycleSink {
335                cycle_id,
336                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
337                op_metadata: HydroIrOpMetadata::new(),
338            });
339    }
340}
341
342impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
343    for Stream<T, L, B, O, R>
344where
345    L: Location<'a>,
346{
347    type Location = L;
348
349    fn create_source(cycle_id: CycleId, location: L) -> Self {
350        Stream::new(
351            location.clone(),
352            HydroNode::CycleSource {
353                cycle_id,
354                metadata: location.new_node_metadata(Self::collection_kind()),
355            },
356        )
357    }
358}
359
360impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
361    for Stream<T, L, B, O, R>
362where
363    L: Location<'a>,
364{
365    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
366        assert_eq!(
367            Location::id(&self.location),
368            expected_location,
369            "locations do not match"
370        );
371        self.location
372            .flow_state()
373            .borrow_mut()
374            .push_root(HydroRoot::CycleSink {
375                cycle_id,
376                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
377                op_metadata: HydroIrOpMetadata::new(),
378            });
379    }
380}
381
382impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
383where
384    T: Clone,
385    L: Location<'a>,
386{
387    fn clone(&self) -> Self {
388        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
389            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
390            *self.ir_node.borrow_mut() = HydroNode::Tee {
391                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
392                metadata: self.location.new_node_metadata(Self::collection_kind()),
393            };
394        }
395
396        let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
397            unreachable!()
398        };
399        Stream {
400            location: self.location.clone(),
401            flow_state: self.flow_state.clone(),
402            ir_node: HydroNode::Tee {
403                inner: SharedNode(inner.0.clone()),
404                metadata: metadata.clone(),
405            }
406            .into(),
407            _phantom: PhantomData,
408        }
409    }
410}
411
412impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
413where
414    L: Location<'a>,
415{
416    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
417        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
418        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
419
420        let flow_state = location.flow_state().clone();
421        Stream {
422            location,
423            flow_state,
424            ir_node: RefCell::new(ir_node),
425            _phantom: PhantomData,
426        }
427    }
428
429    /// Returns the [`Location`] where this stream is being materialized.
430    pub fn location(&self) -> &L {
431        &self.location
432    }
433
434    /// Creates a shared reference handle to this stream's handoff buffer that can be captured
435    /// inside `q!()` closures. The handle resolves to `&Vec<T>` at runtime.
436    ///
437    /// The stream must be bounded, otherwise reading it would be non-deterministic.
438    pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
439    where
440        B: IsBounded,
441    {
442        crate::handoff_ref::StreamRef::new(&self.ir_node)
443    }
444
445    /// Returns a mutable reference handle to this stream's handoff buffer that can be captured
446    /// inside `q!()` closures. The handle resolves to `&mut Vec<T>` at runtime.
447    pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
448    where
449        B: IsBounded,
450    {
451        crate::handoff_ref::StreamMut::new(&self.ir_node)
452    }
453
454    /// Weakens the consistency of this live collection to not guarantee any consistency across
455    /// cluster members (if this collection is on a cluster).
456    pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
457    where
458        L: Location<'a>,
459    {
460        if L::consistency()
461            .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
462        {
463            // already no consistency
464            Stream::new(
465                self.location.drop_consistency(),
466                self.ir_node.replace(HydroNode::Placeholder),
467            )
468        } else {
469            Stream::new(
470                self.location.drop_consistency(),
471                HydroNode::Cast {
472                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
473                    metadata: self.location.drop_consistency().new_node_metadata(Stream::<
474                        T,
475                        L::DropConsistency,
476                        B,
477                        O,
478                        R,
479                    >::collection_kind(
480                    )),
481                },
482            )
483        }
484    }
485
486    /// Casts this live collection to have the consistency guarantees specified in the given
487    /// location type parameter. The developer must ensure that the strengthened consistency
488    /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
489    pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
490        self,
491        _proof: impl crate::properties::ConsistencyProof,
492    ) -> Stream<T, L2, B, O, R>
493    where
494        L: Location<'a>,
495    {
496        if L::consistency() == L2::consistency() {
497            Stream::new(
498                self.location.with_consistency_of(),
499                self.ir_node.replace(HydroNode::Placeholder),
500            )
501        } else {
502            Stream::new(
503                self.location.with_consistency_of(),
504                HydroNode::AssertIsConsistent {
505                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
506                    trusted: false,
507                    metadata: self
508                        .location
509                        .clone()
510                        .with_consistency_of::<L2>()
511                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
512                },
513            )
514        }
515    }
516
517    pub(crate) fn assert_has_consistency_of_trusted<
518        L2: Location<'a, DropConsistency = L::DropConsistency>,
519    >(
520        self,
521        _proof: impl crate::properties::ConsistencyProof,
522    ) -> Stream<T, L2, B, O, R>
523    where
524        L: Location<'a>,
525    {
526        if L::consistency() == L2::consistency() {
527            Stream::new(
528                self.location.with_consistency_of(),
529                self.ir_node.replace(HydroNode::Placeholder),
530            )
531        } else {
532            Stream::new(
533                self.location.with_consistency_of(),
534                HydroNode::AssertIsConsistent {
535                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
536                    trusted: true,
537                    metadata: self
538                        .location
539                        .clone()
540                        .with_consistency_of::<L2>()
541                        .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
542                },
543            )
544        }
545    }
546
547    pub(crate) fn collection_kind() -> CollectionKind {
548        CollectionKind::Stream {
549            bound: B::BOUND_KIND,
550            order: O::ORDERING_KIND,
551            retry: R::RETRIES_KIND,
552            element_type: quote_type::<T>().into(),
553        }
554    }
555
556    /// Produces a stream based on invoking `f` on each element.
557    /// If you do not want to modify the stream and instead only want to view
558    /// each item use [`Stream::inspect`] instead.
559    ///
560    /// # Example
561    /// ```rust
562    /// # #[cfg(feature = "deploy")] {
563    /// # use hydro_lang::prelude::*;
564    /// # use futures::StreamExt;
565    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
566    /// let words = process.source_iter(q!(vec!["hello", "world"]));
567    /// words.map(q!(|x| x.to_uppercase()))
568    /// # }, |mut stream| async move {
569    /// # for w in vec!["HELLO", "WORLD"] {
570    /// #     assert_eq!(stream.next().await.unwrap(), w);
571    /// # }
572    /// # }));
573    /// # }
574    /// ```
575    pub fn map<U, F, C, I, const WAS_MUT: bool>(
576        self,
577        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
578    ) -> Stream<U, L, B, O, R>
579    where
580        F: FnMut(T) -> U + 'a,
581        C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
582        I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
583    {
584        let f = crate::handoff_ref::with_ref_capture(|| {
585            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
586            proof.register_proof(&expr);
587            expr.into()
588        });
589        Stream::new(
590            self.location.clone(),
591            HydroNode::Map {
592                f,
593                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
594                metadata: self
595                    .location
596                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
597            },
598        )
599    }
600
601    /// For each item `i` in the input stream, transform `i` using `f` and then treat the
602    /// result as an [`Iterator`] to produce items one by one. The implementation for [`Iterator`]
603    /// for the output type `U` must produce items in a **deterministic** order.
604    ///
605    /// For example, `U` could be a `Vec`, but not a `HashSet`. If the order of the items in `U` is
606    /// not deterministic, use [`Stream::flat_map_unordered`] instead.
607    ///
608    /// # Example
609    /// ```rust
610    /// # #[cfg(feature = "deploy")] {
611    /// # use hydro_lang::prelude::*;
612    /// # use futures::StreamExt;
613    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
614    /// process
615    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
616    ///     .flat_map_ordered(q!(|x| x))
617    /// # }, |mut stream| async move {
618    /// // 1, 2, 3, 4
619    /// # for w in (1..5) {
620    /// #     assert_eq!(stream.next().await.unwrap(), w);
621    /// # }
622    /// # }));
623    /// # }
624    /// ```
625    pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
626        self,
627        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
628    ) -> Stream<U, L, B, O, R>
629    where
630        I: IntoIterator<Item = U>,
631        F: FnMut(T) -> I + 'a,
632        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
633        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
634    {
635        let f = crate::handoff_ref::with_ref_capture(|| {
636            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
637            proof.register_proof(&expr);
638            expr.into()
639        });
640        Stream::new(
641            self.location.clone(),
642            HydroNode::FlatMap {
643                f,
644                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
645                metadata: self
646                    .location
647                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
648            },
649        )
650    }
651
652    /// Like [`Stream::flat_map_ordered`], but allows the implementation of [`Iterator`]
653    /// for the output type `U` to produce items in any order.
654    ///
655    /// # Example
656    /// ```rust
657    /// # #[cfg(feature = "deploy")] {
658    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
659    /// # use futures::StreamExt;
660    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
661    /// process
662    ///     .source_iter(q!(vec![
663    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
664    ///         std::collections::HashSet::from_iter(vec![3, 4]),
665    ///     ]))
666    ///     .flat_map_unordered(q!(|x| x))
667    /// # }, |mut stream| async move {
668    /// // 1, 2, 3, 4, but in no particular order
669    /// # let mut results = Vec::new();
670    /// # for w in (1..5) {
671    /// #     results.push(stream.next().await.unwrap());
672    /// # }
673    /// # results.sort();
674    /// # assert_eq!(results, vec![1, 2, 3, 4]);
675    /// # }));
676    /// # }
677    /// ```
678    pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
679        self,
680        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
681    ) -> Stream<U, L, B, NoOrder, R>
682    where
683        I: IntoIterator<Item = U>,
684        F: FnMut(T) -> I + 'a,
685        C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
686        Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
687    {
688        let f = crate::handoff_ref::with_ref_capture(|| {
689            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
690            proof.register_proof(&expr);
691            expr.into()
692        });
693        Stream::new(
694            self.location.clone(),
695            HydroNode::FlatMap {
696                f,
697                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
698                metadata: self
699                    .location
700                    .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
701            },
702        )
703    }
704
705    /// For each item `i` in the input stream, treat `i` as an [`Iterator`] and produce its items one by one.
706    /// The implementation for [`Iterator`] for the element type `T` must produce items in a **deterministic** order.
707    ///
708    /// For example, `T` could be a `Vec`, but not a `HashSet`. If the order of the items in `T` is
709    /// not deterministic, use [`Stream::flatten_unordered`] instead.
710    ///
711    /// ```rust
712    /// # #[cfg(feature = "deploy")] {
713    /// # use hydro_lang::prelude::*;
714    /// # use futures::StreamExt;
715    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
716    /// process
717    ///     .source_iter(q!(vec![vec![1, 2], vec![3, 4]]))
718    ///     .flatten_ordered()
719    /// # }, |mut stream| async move {
720    /// // 1, 2, 3, 4
721    /// # for w in (1..5) {
722    /// #     assert_eq!(stream.next().await.unwrap(), w);
723    /// # }
724    /// # }));
725    /// # }
726    /// ```
727    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
728    where
729        T: IntoIterator<Item = U>,
730    {
731        self.flat_map_ordered(q!(|d| d))
732    }
733
734    /// Like [`Stream::flatten_ordered`], but allows the implementation of [`Iterator`]
735    /// for the element type `T` to produce items in any order.
736    ///
737    /// # Example
738    /// ```rust
739    /// # #[cfg(feature = "deploy")] {
740    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
741    /// # use futures::StreamExt;
742    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
743    /// process
744    ///     .source_iter(q!(vec![
745    ///         std::collections::HashSet::<i32>::from_iter(vec![1, 2]),
746    ///         std::collections::HashSet::from_iter(vec![3, 4]),
747    ///     ]))
748    ///     .flatten_unordered()
749    /// # }, |mut stream| async move {
750    /// // 1, 2, 3, 4, but in no particular order
751    /// # let mut results = Vec::new();
752    /// # for w in (1..5) {
753    /// #     results.push(stream.next().await.unwrap());
754    /// # }
755    /// # results.sort();
756    /// # assert_eq!(results, vec![1, 2, 3, 4]);
757    /// # }));
758    /// # }
759    /// ```
760    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
761    where
762        T: IntoIterator<Item = U>,
763    {
764        self.flat_map_unordered(q!(|d| d))
765    }
766
767    /// For each item in the input stream, apply `f` to produce a [`futures::stream::Stream`],
768    /// then emit the elements of that stream one by one. When the inner stream yields
769    /// `Pending`, this operator yields as well.
770    pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
771        self,
772        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
773    ) -> Stream<U, L, B, O, R>
774    where
775        S: futures::Stream<Item = U>,
776        F: FnMut(T) -> S + 'a,
777        C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
778        Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
779    {
780        let f = crate::handoff_ref::with_ref_capture(|| {
781            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
782            proof.register_proof(&expr);
783            expr.into()
784        });
785        Stream::new(
786            self.location.clone(),
787            HydroNode::FlatMapStreamBlocking {
788                f,
789                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
790                metadata: self
791                    .location
792                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
793            },
794        )
795    }
796
797    /// For each item in the input stream, treat it as a [`futures::stream::Stream`] and
798    /// emit its elements one by one. When the inner stream yields `Pending`, this operator
799    /// yields as well.
800    pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
801    where
802        T: futures::Stream<Item = U>,
803    {
804        self.flat_map_stream_blocking(q!(|d| d))
805    }
806
807    /// Creates a stream containing only the elements of the input stream that satisfy a predicate
808    /// `f`, preserving the order of the elements.
809    ///
810    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
811    /// not modify or take ownership of the values. If you need to modify the values while filtering
812    /// use [`Stream::filter_map`] instead.
813    ///
814    /// # Example
815    /// ```rust
816    /// # #[cfg(feature = "deploy")] {
817    /// # use hydro_lang::prelude::*;
818    /// # use futures::StreamExt;
819    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
820    /// process
821    ///     .source_iter(q!(vec![1, 2, 3, 4]))
822    ///     .filter(q!(|&x| x > 2))
823    /// # }, |mut stream| async move {
824    /// // 3, 4
825    /// # for w in (3..5) {
826    /// #     assert_eq!(stream.next().await.unwrap(), w);
827    /// # }
828    /// # }));
829    /// # }
830    /// ```
831    pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
832        self,
833        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
834    ) -> Self
835    where
836        F: FnMut(&T) -> bool + 'a,
837        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
838        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
839    {
840        let f = crate::handoff_ref::with_ref_capture(|| {
841            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
842            proof.register_proof(&expr);
843            expr.into()
844        });
845        Stream::new(
846            self.location.clone(),
847            HydroNode::Filter {
848                f,
849                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
850                metadata: self.location.new_node_metadata(Self::collection_kind()),
851            },
852        )
853    }
854
855    /// Splits the stream into two streams based on a predicate, without cloning elements.
856    ///
857    /// Elements for which `f` returns `true` are sent to the first output stream,
858    /// and elements for which `f` returns `false` are sent to the second output stream.
859    ///
860    /// Unlike using `filter` twice, this only evaluates the predicate once per element
861    /// and does not require `T: Clone`.
862    ///
863    /// The closure `f` receives a reference `&T` rather than an owned value `T` because
864    /// the predicate is only used for routing; the element itself is moved to the
865    /// appropriate output stream.
866    ///
867    /// # Example
868    /// ```rust
869    /// # #[cfg(feature = "deploy")] {
870    /// # use hydro_lang::prelude::*;
871    /// # use hydro_lang::live_collections::stream::{NoOrder, ExactlyOnce};
872    /// # use futures::StreamExt;
873    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
874    /// let numbers: Stream<_, _, Unbounded> = process.source_iter(q!(vec![1, 2, 3, 4, 5, 6])).into();
875    /// let (evens, odds) = numbers.partition(q!(|&x| x % 2 == 0));
876    /// // evens: 2, 4, 6 tagged with true; odds: 1, 3, 5 tagged with false
877    /// evens.map(q!(|x| (x, true)))
878    ///     .merge_unordered(odds.map(q!(|x| (x, false))))
879    /// # }, |mut stream| async move {
880    /// # let mut results = Vec::new();
881    /// # for _ in 0..6 {
882    /// #     results.push(stream.next().await.unwrap());
883    /// # }
884    /// # results.sort();
885    /// # assert_eq!(results, vec![(1, false), (2, true), (3, false), (4, true), (5, false), (6, true)]);
886    /// # }));
887    /// # }
888    /// ```
889    pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
890        self,
891        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
892    ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
893    where
894        F: FnMut(&T) -> bool + 'a,
895        C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
896        Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
897    {
898        let f = crate::handoff_ref::with_ref_capture(|| {
899            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
900            proof.register_proof(&expr);
901            expr.into()
902        });
903        let shared = SharedNode(Rc::new(RefCell::new(
904            self.ir_node.replace(HydroNode::Placeholder),
905        )));
906
907        let true_stream = Stream::new(
908            self.location.clone(),
909            HydroNode::Partition {
910                inner: SharedNode(shared.0.clone()),
911                f: f.clone(),
912                is_true: true,
913                metadata: self.location.new_node_metadata(Self::collection_kind()),
914            },
915        );
916
917        let false_stream = Stream::new(
918            self.location.clone(),
919            HydroNode::Partition {
920                inner: SharedNode(shared.0),
921                f,
922                is_true: false,
923                metadata: self.location.new_node_metadata(Self::collection_kind()),
924            },
925        );
926
927        (true_stream, false_stream)
928    }
929
930    /// An operator that both filters and maps. It yields only the items for which the supplied closure `f` returns `Some(value)`.
931    ///
932    /// # Example
933    /// ```rust
934    /// # #[cfg(feature = "deploy")] {
935    /// # use hydro_lang::prelude::*;
936    /// # use futures::StreamExt;
937    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938    /// process
939    ///     .source_iter(q!(vec!["1", "hello", "world", "2"]))
940    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
941    /// # }, |mut stream| async move {
942    /// // 1, 2
943    /// # for w in (1..3) {
944    /// #     assert_eq!(stream.next().await.unwrap(), w);
945    /// # }
946    /// # }));
947    /// # }
948    /// ```
949    pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
950        self,
951        f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
952    ) -> Stream<U, L, B, O, R>
953    where
954        F: FnMut(T) -> Option<U> + 'a,
955        C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
956        Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
957    {
958        let f = crate::handoff_ref::with_ref_capture(|| {
959            let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
960            proof.register_proof(&expr);
961            expr.into()
962        });
963        Stream::new(
964            self.location.clone(),
965            HydroNode::FilterMap {
966                f,
967                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
968                metadata: self
969                    .location
970                    .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
971            },
972        )
973    }
974
975    /// Generates a stream that maps each input element `i` to a tuple `(i, x)`,
976    /// where `x` is the final value of `other`, a bounded [`Singleton`] or [`Optional`].
977    /// If `other` is an empty [`Optional`], no values will be produced.
978    ///
979    /// # Example
980    /// ```rust
981    /// # #[cfg(feature = "deploy")] {
982    /// # use hydro_lang::prelude::*;
983    /// # use futures::StreamExt;
984    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
985    /// let tick = process.tick();
986    /// let batch = process
987    ///   .source_iter(q!(vec![1, 2, 3, 4]))
988    ///   .batch(&tick, nondet!(/** test */));
989    /// let count = batch.clone().count(); // `count()` returns a singleton
990    /// batch.cross_singleton(count).all_ticks()
991    /// # }, |mut stream| async move {
992    /// // (1, 4), (2, 4), (3, 4), (4, 4)
993    /// # for w in vec![(1, 4), (2, 4), (3, 4), (4, 4)] {
994    /// #     assert_eq!(stream.next().await.unwrap(), w);
995    /// # }
996    /// # }));
997    /// # }
998    /// ```
999    pub fn cross_singleton<O2>(
1000        self,
1001        other: impl Into<Optional<O2, L, Bounded>>,
1002    ) -> Stream<(T, O2), L, B, O, R>
1003    where
1004        O2: Clone,
1005    {
1006        let other: Optional<O2, L, Bounded> = other.into();
1007        check_matching_location(&self.location, &other.location);
1008
1009        Stream::new(
1010            self.location.clone(),
1011            HydroNode::CrossSingleton {
1012                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1013                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1014                metadata: self
1015                    .location
1016                    .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1017            },
1018        )
1019    }
1020
1021    /// Passes this stream through if the boolean signal is `true`, otherwise the output is empty.
1022    ///
1023    /// # Example
1024    /// ```rust
1025    /// # #[cfg(feature = "deploy")] {
1026    /// # use hydro_lang::prelude::*;
1027    /// # use futures::StreamExt;
1028    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1029    /// let tick = process.tick();
1030    /// // ticks are lazy by default, forces the second tick to run
1031    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1032    ///
1033    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
1034    /// let batch_first_tick = process
1035    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1036    ///   .batch(&tick, nondet!(/** test */));
1037    /// let batch_second_tick = process
1038    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1039    ///   .batch(&tick, nondet!(/** test */))
1040    ///   .defer_tick();
1041    /// batch_first_tick.chain(batch_second_tick)
1042    ///   .filter_if(signal)
1043    ///   .all_ticks()
1044    /// # }, |mut stream| async move {
1045    /// // [1, 2, 3, 4]
1046    /// # for w in vec![1, 2, 3, 4] {
1047    /// #     assert_eq!(stream.next().await.unwrap(), w);
1048    /// # }
1049    /// # }));
1050    /// # }
1051    /// ```
1052    pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1053        self.cross_singleton(signal.filter(q!(|b| *b)))
1054            .map(q!(|(d, _)| d))
1055    }
1056
1057    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is empty.
1058    ///
1059    /// Useful for gating the release of elements based on a condition, such as only processing requests if you are the
1060    /// leader of a cluster.
1061    ///
1062    /// # Example
1063    /// ```rust
1064    /// # #[cfg(feature = "deploy")] {
1065    /// # use hydro_lang::prelude::*;
1066    /// # use futures::StreamExt;
1067    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1068    /// let tick = process.tick();
1069    /// // ticks are lazy by default, forces the second tick to run
1070    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1071    ///
1072    /// let batch_first_tick = process
1073    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1074    ///   .batch(&tick, nondet!(/** test */));
1075    /// let batch_second_tick = process
1076    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1077    ///   .batch(&tick, nondet!(/** test */))
1078    ///   .defer_tick(); // appears on the second tick
1079    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1080    /// batch_first_tick.chain(batch_second_tick)
1081    ///   .filter_if_some(some_on_first_tick)
1082    ///   .all_ticks()
1083    /// # }, |mut stream| async move {
1084    /// // [1, 2, 3, 4]
1085    /// # for w in vec![1, 2, 3, 4] {
1086    /// #     assert_eq!(stream.next().await.unwrap(), w);
1087    /// # }
1088    /// # }));
1089    /// # }
1090    /// ```
1091    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1092    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1093        self.filter_if(signal.is_some())
1094    }
1095
1096    /// Passes this stream through if the argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is empty.
1097    ///
1098    /// Useful for gating the release of elements based on a condition, such as triggering a protocol if you are missing
1099    /// some local state.
1100    ///
1101    /// # Example
1102    /// ```rust
1103    /// # #[cfg(feature = "deploy")] {
1104    /// # use hydro_lang::prelude::*;
1105    /// # use futures::StreamExt;
1106    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1107    /// let tick = process.tick();
1108    /// // ticks are lazy by default, forces the second tick to run
1109    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1110    ///
1111    /// let batch_first_tick = process
1112    ///   .source_iter(q!(vec![1, 2, 3, 4]))
1113    ///   .batch(&tick, nondet!(/** test */));
1114    /// let batch_second_tick = process
1115    ///   .source_iter(q!(vec![5, 6, 7, 8]))
1116    ///   .batch(&tick, nondet!(/** test */))
1117    ///   .defer_tick(); // appears on the second tick
1118    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1119    /// batch_first_tick.chain(batch_second_tick)
1120    ///   .filter_if_none(some_on_first_tick)
1121    ///   .all_ticks()
1122    /// # }, |mut stream| async move {
1123    /// // [5, 6, 7, 8]
1124    /// # for w in vec![5, 6, 7, 8] {
1125    /// #     assert_eq!(stream.next().await.unwrap(), w);
1126    /// # }
1127    /// # }));
1128    /// # }
1129    /// ```
1130    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1131    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1132        self.filter_if(other.is_none())
1133    }
1134
1135    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams,
1136    /// returning all tupled pairs.
1137    ///
1138    /// When the right side is [`Bounded`], it is accumulated first and the left side streams
1139    /// through, preserving the left side's ordering. When both sides are [`Unbounded`], a
1140    /// symmetric hash join is used and ordering is [`NoOrder`].
1141    ///
1142    /// # Example
1143    /// ```rust
1144    /// # #[cfg(feature = "deploy")] {
1145    /// # use hydro_lang::prelude::*;
1146    /// # use std::collections::HashSet;
1147    /// # use futures::StreamExt;
1148    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1149    /// let tick = process.tick();
1150    /// let stream1 = process.source_iter(q!(vec![1, 2]));
1151    /// let stream2 = process.source_iter(q!(vec!['a', 'b']));
1152    /// stream1.cross_product(stream2)
1153    /// # }, |mut stream| async move {
1154    /// // (1, 'a'), (1, 'b'), (2, 'a'), (2, 'b') in any order
1155    /// # let expected = HashSet::from([(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]);
1156    /// # stream.map(|i| assert!(expected.contains(&i)));
1157    /// # }));
1158    /// # }
1159    pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1160        self,
1161        other: Stream<T2, L, B2, O2, R2>,
1162    ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1163    where
1164        T: Clone,
1165        T2: Clone,
1166        R: MinRetries<R2>,
1167    {
1168        self.map(q!(|v| ((), v)))
1169            .join(other.map(q!(|v| ((), v))))
1170            .map(q!(|((), (v1, v2))| (v1, v2)))
1171    }
1172
1173    /// Takes one stream as input and filters out any duplicate occurrences. The output
1174    /// contains all unique values from the input.
1175    ///
1176    /// # Example
1177    /// ```rust
1178    /// # #[cfg(feature = "deploy")] {
1179    /// # use hydro_lang::prelude::*;
1180    /// # use futures::StreamExt;
1181    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1182    /// let tick = process.tick();
1183    /// process.source_iter(q!(vec![1, 2, 3, 2, 1, 4])).unique()
1184    /// # }, |mut stream| async move {
1185    /// # for w in vec![1, 2, 3, 4] {
1186    /// #     assert_eq!(stream.next().await.unwrap(), w);
1187    /// # }
1188    /// # }));
1189    /// # }
1190    /// ```
1191    pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1192    where
1193        T: Eq + Hash,
1194    {
1195        Stream::new(
1196            self.location.clone(),
1197            HydroNode::Unique {
1198                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199                metadata: self
1200                    .location
1201                    .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1202            },
1203        )
1204    }
1205
1206    /// Outputs everything in this stream that is *not* contained in the `other` stream.
1207    ///
1208    /// The `other` stream must be [`Bounded`], since this function will wait until
1209    /// all its elements are available before producing any output.
1210    /// # Example
1211    /// ```rust
1212    /// # #[cfg(feature = "deploy")] {
1213    /// # use hydro_lang::prelude::*;
1214    /// # use futures::StreamExt;
1215    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1216    /// let tick = process.tick();
1217    /// let stream = process
1218    ///   .source_iter(q!(vec![ 1, 2, 3, 4 ]))
1219    ///   .batch(&tick, nondet!(/** test */));
1220    /// let batch = process
1221    ///   .source_iter(q!(vec![1, 2]))
1222    ///   .batch(&tick, nondet!(/** test */));
1223    /// stream.filter_not_in(batch).all_ticks()
1224    /// # }, |mut stream| async move {
1225    /// # for w in vec![3, 4] {
1226    /// #     assert_eq!(stream.next().await.unwrap(), w);
1227    /// # }
1228    /// # }));
1229    /// # }
1230    /// ```
1231    pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1232    where
1233        T: Eq + Hash,
1234        B2: IsBounded,
1235    {
1236        check_matching_location(&self.location, &other.location);
1237
1238        Stream::new(
1239            self.location.clone(),
1240            HydroNode::Difference {
1241                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1242                neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1243                metadata: self
1244                    .location
1245                    .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1246            },
1247        )
1248    }
1249
1250    /// An operator which allows you to "inspect" each element of a stream without
1251    /// modifying it. The closure `f` is called on a reference to each item. This is
1252    /// mainly useful for debugging, and should not be used to generate side-effects.
1253    ///
1254    /// # Example
1255    /// ```rust
1256    /// # #[cfg(feature = "deploy")] {
1257    /// # use hydro_lang::prelude::*;
1258    /// # use futures::StreamExt;
1259    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1260    /// let nums = process.source_iter(q!(vec![1, 2]));
1261    /// // prints "1 * 10 = 10" and "2 * 10 = 20"
1262    /// nums.inspect(q!(|x| println!("{} * 10 = {}", x, x * 10)))
1263    /// # }, |mut stream| async move {
1264    /// # for w in vec![1, 2] {
1265    /// #     assert_eq!(stream.next().await.unwrap(), w);
1266    /// # }
1267    /// # }));
1268    /// # }
1269    /// ```
1270    pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1271        self,
1272        f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1273    ) -> Self
1274    where
1275        F: FnMut(&T) + 'a,
1276        C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1277        Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1278    {
1279        let f = crate::handoff_ref::with_ref_capture(|| {
1280            let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1281            proof.register_proof(&expr);
1282            expr.into()
1283        });
1284
1285        Stream::new(
1286            self.location.clone(),
1287            HydroNode::Inspect {
1288                f,
1289                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1290                metadata: self.location.new_node_metadata(Self::collection_kind()),
1291            },
1292        )
1293    }
1294
1295    /// Executes the provided closure for every element in this stream.
1296    ///
1297    /// Because the closure may have side effects, the stream must have deterministic order
1298    /// ([`TotalOrder`]) and no retries ([`ExactlyOnce`]). If the side effects can tolerate
1299    /// out-of-order or duplicate execution, use [`Stream::assume_ordering`] and
1300    /// [`Stream::assume_retries`] with an explanation for why this is the case.
1301    ///
1302    /// The closure may capture singletons via `by_ref()` or `by_mut()`. No commutativity
1303    /// or idempotence proofs are needed because the `TotalOrder + ExactlyOnce` requirements
1304    /// already guarantee deterministic execution.
1305    pub fn for_each<F: FnMut(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1306    where
1307        O: IsOrdered,
1308        R: IsExactlyOnce,
1309    {
1310        let f = crate::handoff_ref::with_ref_capture(|| f.splice_fnmut1_ctx(&self.location).into());
1311        self.location
1312            .flow_state()
1313            .borrow_mut()
1314            .push_root(HydroRoot::ForEach {
1315                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1316                f,
1317                op_metadata: HydroIrOpMetadata::new(),
1318            });
1319    }
1320
1321    /// Sends all elements of this stream to a provided [`futures::Sink`], such as an external
1322    /// TCP socket to some other server. You should _not_ use this API for interacting with
1323    /// external clients, instead see [`Location::bidi_external_many_bytes`] and
1324    /// [`Location::bidi_external_many_bincode`]. This should be used for custom, low-level
1325    /// interaction with asynchronous sinks.
1326    pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1327    where
1328        O: IsOrdered,
1329        R: IsExactlyOnce,
1330        S: 'a + futures::Sink<T> + Unpin,
1331    {
1332        self.location
1333            .flow_state()
1334            .borrow_mut()
1335            .push_root(HydroRoot::DestSink {
1336                sink: sink.splice_typed_ctx(&self.location).into(),
1337                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1338                op_metadata: HydroIrOpMetadata::new(),
1339            });
1340    }
1341
1342    /// Maps each element `x` of the stream to `(i, x)`, where `i` is the index of the element.
1343    ///
1344    /// # Example
1345    /// ```rust
1346    /// # #[cfg(feature = "deploy")] {
1347    /// # use hydro_lang::{prelude::*, live_collections::stream::{TotalOrder, ExactlyOnce}};
1348    /// # use futures::StreamExt;
1349    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, TotalOrder, ExactlyOnce>(|process| {
1350    /// let tick = process.tick();
1351    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1352    /// numbers.enumerate()
1353    /// # }, |mut stream| async move {
1354    /// // (0, 1), (1, 2), (2, 3), (3, 4)
1355    /// # for w in vec![(0, 1), (1, 2), (2, 3), (3, 4)] {
1356    /// #     assert_eq!(stream.next().await.unwrap(), w);
1357    /// # }
1358    /// # }));
1359    /// # }
1360    /// ```
1361    pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1362    where
1363        O: IsOrdered,
1364        R: IsExactlyOnce,
1365    {
1366        Stream::new(
1367            self.location.clone(),
1368            HydroNode::Enumerate {
1369                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1370                metadata: self.location.new_node_metadata(Stream::<
1371                    (usize, T),
1372                    L,
1373                    B,
1374                    TotalOrder,
1375                    ExactlyOnce,
1376                >::collection_kind()),
1377            },
1378        )
1379    }
1380
1381    /// Combines elements of the stream into a [`Singleton`], by starting with an intitial value,
1382    /// generated by the `init` closure, and then applying the `comb` closure to each element in the stream.
1383    /// Unlike iterators, `comb` takes the accumulator by `&mut` reference, so that it can be modified in place.
1384    ///
1385    /// Depending on the input stream guarantees, the closure may need to be commutative
1386    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1387    ///
1388    /// # Example
1389    /// ```rust
1390    /// # #[cfg(feature = "deploy")] {
1391    /// # use hydro_lang::prelude::*;
1392    /// # use futures::StreamExt;
1393    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1394    /// let words = process.source_iter(q!(vec!["HELLO", "WORLD"]));
1395    /// words
1396    ///     .fold(q!(|| String::new()), q!(|acc, x| acc.push_str(x)))
1397    ///     .into_stream()
1398    /// # }, |mut stream| async move {
1399    /// // "HELLOWORLD"
1400    /// # assert_eq!(stream.next().await.unwrap(), "HELLOWORLD");
1401    /// # }));
1402    /// # }
1403    /// ```
1404    pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1405        self,
1406        init: impl IntoQuotedMut<'a, I, L>,
1407        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1408    ) -> Singleton<A, L, B2>
1409    where
1410        I: Fn() -> A + 'a,
1411        F: 'a + Fn(&mut A, T),
1412        C: ValidCommutativityFor<O>,
1413        Idemp: ValidIdempotenceFor<R>,
1414        B: ApplyMonotoneStream<M, B2>,
1415    {
1416        let init = init.splice_fn0_ctx(&self.location).into();
1417        let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1418        proof.register_proof(&comb);
1419
1420        // Only assume_retries (for idempotence), not assume_ordering.
1421        // The fold hook in the simulator handles ordering non-determinism directly.
1422        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1423        let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1424
1425        let core = HydroNode::Fold {
1426            init,
1427            acc: comb.into(),
1428            input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1429            metadata: retried
1430                .location
1431                .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1432            // we do not guarantee consistency at this point because if the algebraic properties
1433            // do not hold in practice, replica consistency may fail to be maintained, so we
1434            // would like the simulator to assert consistency; in the future, this will be dynamic
1435            // based on the proof mechanism
1436        };
1437
1438        Singleton::new(retried.location.clone(), core)
1439            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1440    }
1441
1442    /// Combines elements of the stream into an [`Optional`], by starting with the first element in the stream,
1443    /// and then applying the `comb` closure to each element in the stream. The [`Optional`] will be empty
1444    /// until the first element in the input arrives. Unlike iterators, `comb` takes the accumulator by `&mut`
1445    /// reference, so that it can be modified in place.
1446    ///
1447    /// Depending on the input stream guarantees, the closure may need to be commutative
1448    /// (for unordered streams) or idempotent (for streams with non-deterministic duplicates).
1449    ///
1450    /// # Example
1451    /// ```rust
1452    /// # #[cfg(feature = "deploy")] {
1453    /// # use hydro_lang::prelude::*;
1454    /// # use futures::StreamExt;
1455    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1456    /// let bools = process.source_iter(q!(vec![false, true, false]));
1457    /// bools.reduce(q!(|acc, x| *acc |= x)).into_stream()
1458    /// # }, |mut stream| async move {
1459    /// // true
1460    /// # assert_eq!(stream.next().await.unwrap(), true);
1461    /// # }));
1462    /// # }
1463    /// ```
1464    pub fn reduce<F, C, Idemp>(
1465        self,
1466        comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1467    ) -> Optional<T, L, B>
1468    where
1469        F: Fn(&mut T, T) + 'a,
1470        C: ValidCommutativityFor<O>,
1471        Idemp: ValidIdempotenceFor<R>,
1472    {
1473        let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1474        proof.register_proof(&f);
1475
1476        let nondet = nondet!(/** the combinator function is commutative and idempotent */);
1477        let ordered_etc: Stream<T, L::DropConsistency, B> =
1478            self.assume_retries(nondet).assume_ordering(nondet);
1479
1480        let core = HydroNode::Reduce {
1481            f: f.into(),
1482            input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1483            metadata: ordered_etc
1484                .location
1485                .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1486        };
1487
1488        Optional::new(ordered_etc.location.clone(), core)
1489            .assert_has_consistency_of(manual_proof!(/** algebraic properties */))
1490    }
1491
1492    /// Computes the maximum element in the stream as an [`Optional`], which
1493    /// will be empty until the first element in the input arrives.
1494    ///
1495    /// # Example
1496    /// ```rust
1497    /// # #[cfg(feature = "deploy")] {
1498    /// # use hydro_lang::prelude::*;
1499    /// # use futures::StreamExt;
1500    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1501    /// let tick = process.tick();
1502    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1503    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1504    /// batch.max().all_ticks()
1505    /// # }, |mut stream| async move {
1506    /// // 4
1507    /// # assert_eq!(stream.next().await.unwrap(), 4);
1508    /// # }));
1509    /// # }
1510    /// ```
1511    pub fn max(self) -> Optional<T, L, B>
1512    where
1513        T: Ord,
1514    {
1515        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** max is idempotent */))
1516            .assume_ordering_trusted_bounded::<TotalOrder>(
1517                nondet!(/** max is commutative, but order affects intermediates */),
1518            )
1519            .reduce(q!(|curr, new| {
1520                if new > *curr {
1521                    *curr = new;
1522                }
1523            }))
1524    }
1525
1526    /// Computes the minimum element in the stream as an [`Optional`], which
1527    /// will be empty until the first element in the input arrives.
1528    ///
1529    /// # Example
1530    /// ```rust
1531    /// # #[cfg(feature = "deploy")] {
1532    /// # use hydro_lang::prelude::*;
1533    /// # use futures::StreamExt;
1534    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1535    /// let tick = process.tick();
1536    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1537    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1538    /// batch.min().all_ticks()
1539    /// # }, |mut stream| async move {
1540    /// // 1
1541    /// # assert_eq!(stream.next().await.unwrap(), 1);
1542    /// # }));
1543    /// # }
1544    /// ```
1545    pub fn min(self) -> Optional<T, L, B>
1546    where
1547        T: Ord,
1548    {
1549        self.assume_retries_trusted::<ExactlyOnce>(nondet!(/** min is idempotent */))
1550            .assume_ordering_trusted_bounded::<TotalOrder>(
1551                nondet!(/** max is commutative, but order affects intermediates */),
1552            )
1553            .reduce(q!(|curr, new| {
1554                if new < *curr {
1555                    *curr = new;
1556                }
1557            }))
1558    }
1559
1560    /// Computes the first element in the stream as an [`Optional`], which
1561    /// will be empty until the first element in the input arrives.
1562    ///
1563    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1564    /// re-ordering of elements may cause the first element to change.
1565    ///
1566    /// # Example
1567    /// ```rust
1568    /// # #[cfg(feature = "deploy")] {
1569    /// # use hydro_lang::prelude::*;
1570    /// # use futures::StreamExt;
1571    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1572    /// let tick = process.tick();
1573    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1574    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1575    /// batch.first().all_ticks()
1576    /// # }, |mut stream| async move {
1577    /// // 1
1578    /// # assert_eq!(stream.next().await.unwrap(), 1);
1579    /// # }));
1580    /// # }
1581    /// ```
1582    pub fn first(self) -> Optional<T, L, B>
1583    where
1584        O: IsOrdered,
1585    {
1586        self.make_totally_ordered()
1587            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** first is idempotent */))
1588            .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1589            .reduce(q!(|_, _| {}))
1590    }
1591
1592    /// Computes the last element in the stream as an [`Optional`], which
1593    /// will be empty until an element in the input arrives.
1594    ///
1595    /// This requires the stream to have a [`TotalOrder`] guarantee, otherwise
1596    /// re-ordering of elements may cause the last element to change.
1597    ///
1598    /// # Example
1599    /// ```rust
1600    /// # #[cfg(feature = "deploy")] {
1601    /// # use hydro_lang::prelude::*;
1602    /// # use futures::StreamExt;
1603    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1604    /// let tick = process.tick();
1605    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1606    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1607    /// batch.last().all_ticks()
1608    /// # }, |mut stream| async move {
1609    /// // 4
1610    /// # assert_eq!(stream.next().await.unwrap(), 4);
1611    /// # }));
1612    /// # }
1613    /// ```
1614    pub fn last(self) -> Optional<T, L, B>
1615    where
1616        O: IsOrdered,
1617    {
1618        self.make_totally_ordered()
1619            .assume_retries_trusted::<ExactlyOnce>(nondet!(/** last is idempotent */))
1620            .reduce(q!(|curr, new| *curr = new))
1621    }
1622
1623    /// Returns a stream containing at most the first `n` elements of the input stream,
1624    /// preserving the original order. Similar to `LIMIT` in SQL.
1625    ///
1626    /// This requires the stream to have a [`TotalOrder`] guarantee and [`ExactlyOnce`]
1627    /// retries, since the result depends on the order and cardinality of elements.
1628    ///
1629    /// # Example
1630    /// ```rust
1631    /// # #[cfg(feature = "deploy")] {
1632    /// # use hydro_lang::prelude::*;
1633    /// # use futures::StreamExt;
1634    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1635    /// let numbers = process.source_iter(q!(vec![10, 20, 30, 40, 50]));
1636    /// numbers.limit(q!(3))
1637    /// # }, |mut stream| async move {
1638    /// // 10, 20, 30
1639    /// # for w in vec![10, 20, 30] {
1640    /// #     assert_eq!(stream.next().await.unwrap(), w);
1641    /// # }
1642    /// # }));
1643    /// # }
1644    /// ```
1645    pub fn limit(
1646        self,
1647        n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1648    ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1649    where
1650        O: IsOrdered,
1651        R: IsExactlyOnce,
1652    {
1653        self.generator(
1654            q!(|| 0usize),
1655            q!(move |count, item| {
1656                if *count == n {
1657                    Generate::Break
1658                } else {
1659                    *count += 1;
1660                    if *count == n {
1661                        Generate::Return(item)
1662                    } else {
1663                        Generate::Yield(item)
1664                    }
1665                }
1666            }),
1667        )
1668    }
1669
1670    /// Collects all the elements of this stream into a single [`Vec`] element.
1671    ///
1672    /// If the input stream is [`Unbounded`], the output [`Singleton`] will be [`Unbounded`] as
1673    /// well, which means that the value of the [`Vec`] will asynchronously grow as new elements
1674    /// are added. On such a value, you can use [`Singleton::snapshot`] to grab an instance of
1675    /// the vector at an arbitrary point in time.
1676    ///
1677    /// # Example
1678    /// ```rust
1679    /// # #[cfg(feature = "deploy")] {
1680    /// # use hydro_lang::prelude::*;
1681    /// # use futures::StreamExt;
1682    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1683    /// let tick = process.tick();
1684    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
1685    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1686    /// batch.collect_vec().all_ticks() // emit each tick's Vec into an unbounded stream
1687    /// # }, |mut stream| async move {
1688    /// // [ vec![1, 2, 3, 4] ]
1689    /// # for w in vec![vec![1, 2, 3, 4]] {
1690    /// #     assert_eq!(stream.next().await.unwrap(), w);
1691    /// # }
1692    /// # }));
1693    /// # }
1694    /// ```
1695    pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1696    where
1697        O: IsOrdered,
1698        R: IsExactlyOnce,
1699    {
1700        self.make_totally_ordered().make_exactly_once().fold(
1701            q!(|| vec![]),
1702            q!(|acc, v| {
1703                acc.push(v);
1704            }),
1705        )
1706    }
1707
1708    /// Applies a function to each element of the stream, maintaining an internal state (accumulator)
1709    /// and emitting each intermediate result.
1710    ///
1711    /// Unlike `fold` which only returns the final accumulated value, `scan` produces a new stream
1712    /// containing all intermediate accumulated values. The scan operation can also terminate early
1713    /// by returning `None`.
1714    ///
1715    /// The function takes a mutable reference to the accumulator and the current element, and returns
1716    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
1717    /// If the function returns `None`, the stream is terminated and no more elements are processed.
1718    ///
1719    /// # Examples
1720    ///
1721    /// Basic usage - running sum:
1722    /// ```rust
1723    /// # #[cfg(feature = "deploy")] {
1724    /// # use hydro_lang::prelude::*;
1725    /// # use futures::StreamExt;
1726    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1727    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1728    ///     q!(|| 0),
1729    ///     q!(|acc, x| {
1730    ///         *acc += x;
1731    ///         Some(*acc)
1732    ///     }),
1733    /// )
1734    /// # }, |mut stream| async move {
1735    /// // Output: 1, 3, 6, 10
1736    /// # for w in vec![1, 3, 6, 10] {
1737    /// #     assert_eq!(stream.next().await.unwrap(), w);
1738    /// # }
1739    /// # }));
1740    /// # }
1741    /// ```
1742    ///
1743    /// Early termination example:
1744    /// ```rust
1745    /// # #[cfg(feature = "deploy")] {
1746    /// # use hydro_lang::prelude::*;
1747    /// # use futures::StreamExt;
1748    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1749    /// process.source_iter(q!(vec![1, 2, 3, 4])).scan(
1750    ///     q!(|| 1),
1751    ///     q!(|state, x| {
1752    ///         *state = *state * x;
1753    ///         if *state > 6 {
1754    ///             None // Terminate the stream
1755    ///         } else {
1756    ///             Some(-*state)
1757    ///         }
1758    ///     }),
1759    /// )
1760    /// # }, |mut stream| async move {
1761    /// // Output: -1, -2, -6
1762    /// # for w in vec![-1, -2, -6] {
1763    /// #     assert_eq!(stream.next().await.unwrap(), w);
1764    /// # }
1765    /// # }));
1766    /// # }
1767    /// ```
1768    pub fn scan<A, U, I, F>(
1769        self,
1770        init: impl IntoQuotedMut<'a, I, L>,
1771        f: impl IntoQuotedMut<'a, F, L>,
1772    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1773    where
1774        O: IsOrdered,
1775        R: IsExactlyOnce,
1776        I: Fn() -> A + 'a,
1777        F: Fn(&mut A, T) -> Option<U> + 'a,
1778    {
1779        let init = init.splice_fn0_ctx(&self.location).into();
1780        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1781
1782        Stream::new(
1783            self.location.clone(),
1784            HydroNode::Scan {
1785                init,
1786                acc: f,
1787                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1788                metadata: self.location.new_node_metadata(
1789                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1790                ),
1791            },
1792        )
1793    }
1794
1795    /// Async version of [`Stream::scan`]. Applies an async function to each element of the
1796    /// stream, maintaining an internal state (accumulator) and emitting the values returned
1797    /// by the function.
1798    ///
1799    /// The closure runs synchronously (so it can mutate the accumulator), then returns a
1800    /// future. The future is polled to completion. If it resolves to `Some`, the value is
1801    /// emitted. If it resolves to `None`, the item is filtered out.
1802    ///
1803    /// # Examples
1804    ///
1805    /// ```rust
1806    /// # #[cfg(feature = "deploy")] {
1807    /// # use hydro_lang::prelude::*;
1808    /// # use futures::StreamExt;
1809    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1810    /// process
1811    ///     .source_iter(q!(vec![1, 2, 3, 4]))
1812    ///     .scan_async_blocking(
1813    ///         q!(|| 0),
1814    ///         q!(|acc, x| {
1815    ///             *acc += x;
1816    ///             let val = *acc;
1817    ///             async move { Some(val) }
1818    ///         }),
1819    ///     )
1820    /// # }, |mut stream| async move {
1821    /// // Output: 1, 3, 6, 10
1822    /// # for w in vec![1, 3, 6, 10] {
1823    /// #     assert_eq!(stream.next().await.unwrap(), w);
1824    /// # }
1825    /// # }));
1826    /// # }
1827    /// ```
1828    pub fn scan_async_blocking<A, U, I, F, Fut>(
1829        self,
1830        init: impl IntoQuotedMut<'a, I, L>,
1831        f: impl IntoQuotedMut<'a, F, L>,
1832    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1833    where
1834        O: IsOrdered,
1835        R: IsExactlyOnce,
1836        I: Fn() -> A + 'a,
1837        F: Fn(&mut A, T) -> Fut + 'a,
1838        Fut: Future<Output = Option<U>> + 'a,
1839    {
1840        let init = init.splice_fn0_ctx(&self.location).into();
1841        let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1842
1843        Stream::new(
1844            self.location.clone(),
1845            HydroNode::ScanAsyncBlocking {
1846                init,
1847                acc: f,
1848                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1849                metadata: self.location.new_node_metadata(
1850                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1851                ),
1852            },
1853        )
1854    }
1855
1856    /// Iteratively processes the elements of the stream using a state machine that can yield
1857    /// elements as it processes its inputs. This is designed to mirror the unstable generator
1858    /// syntax in Rust, without requiring special syntax.
1859    ///
1860    /// Like [`Stream::scan`], this function takes in an initializer that emits the initial
1861    /// state. The second argument defines the processing logic, taking in a mutable reference
1862    /// to the state and the value to be processed. It emits a [`Generate`] value, whose
1863    /// variants define what is emitted and whether further inputs should be processed.
1864    ///
1865    /// # Example
1866    /// ```rust
1867    /// # #[cfg(feature = "deploy")] {
1868    /// # use hydro_lang::prelude::*;
1869    /// # use futures::StreamExt;
1870    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1871    /// process.source_iter(q!(vec![1, 3, 100, 10])).generator(
1872    ///     q!(|| 0),
1873    ///     q!(|acc, x| {
1874    ///         *acc += x;
1875    ///         if *acc > 100 {
1876    ///             hydro_lang::live_collections::keyed_stream::Generate::Return("done!".to_owned())
1877    ///         } else if *acc % 2 == 0 {
1878    ///             hydro_lang::live_collections::keyed_stream::Generate::Yield("even".to_owned())
1879    ///         } else {
1880    ///             hydro_lang::live_collections::keyed_stream::Generate::Continue
1881    ///         }
1882    ///     }),
1883    /// )
1884    /// # }, |mut stream| async move {
1885    /// // Output: "even", "done!"
1886    /// # let mut results = Vec::new();
1887    /// # for _ in 0..2 {
1888    /// #     results.push(stream.next().await.unwrap());
1889    /// # }
1890    /// # results.sort();
1891    /// # assert_eq!(results, vec!["done!".to_owned(), "even".to_owned()]);
1892    /// # }));
1893    /// # }
1894    /// ```
1895    pub fn generator<A, U, I, F>(
1896        self,
1897        init: impl IntoQuotedMut<'a, I, L> + Copy,
1898        f: impl IntoQuotedMut<'a, F, L> + Copy,
1899    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1900    where
1901        O: IsOrdered,
1902        R: IsExactlyOnce,
1903        I: Fn() -> A + 'a,
1904        F: Fn(&mut A, T) -> Generate<U> + 'a,
1905    {
1906        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1907        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1908
1909        let this = self.make_totally_ordered().make_exactly_once();
1910
1911        // State is Option<Option<A>>:
1912        //   None = not yet initialized
1913        //   Some(Some(a)) = active with state a
1914        //   Some(None) = terminated
1915        let scan_init = q!(|| None)
1916            .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1917            .into();
1918        let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1919            if state.is_none() {
1920                *state = Some(Some(init()));
1921            }
1922            match state {
1923                Some(Some(state_value)) => match f(state_value, v) {
1924                    Generate::Yield(out) => Some(Some(out)),
1925                    Generate::Return(out) => {
1926                        *state = Some(None);
1927                        Some(Some(out))
1928                    }
1929                    // Unlike KeyedStream, we can terminate the scan directly on
1930                    // Break/Return because there is only one state (no other keys
1931                    // that still need processing).
1932                    Generate::Break => None,
1933                    Generate::Continue => Some(None),
1934                },
1935                // State is Some(None) after Return; terminate the scan.
1936                _ => None,
1937            }
1938        })
1939        .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1940        .into();
1941
1942        let scan_node = HydroNode::Scan {
1943            init: scan_init,
1944            acc: scan_f,
1945            input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1946            metadata: this.location.new_node_metadata(Stream::<
1947                Option<U>,
1948                L,
1949                B,
1950                TotalOrder,
1951                ExactlyOnce,
1952            >::collection_kind()),
1953        };
1954
1955        let flatten_f = q!(|d| d)
1956            .splice_fn1_ctx::<Option<U>, _>(&this.location)
1957            .into();
1958        let flatten_node = HydroNode::FlatMap {
1959            f: flatten_f,
1960            input: Box::new(scan_node),
1961            metadata: this
1962                .location
1963                .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1964        };
1965
1966        Stream::new(this.location.clone(), flatten_node)
1967    }
1968
1969    /// Given a time interval, returns a stream corresponding to samples taken from the
1970    /// stream roughly at that interval. The output will have elements in the same order
1971    /// as the input, but with arbitrary elements skipped between samples. There is also
1972    /// no guarantee on the exact timing of the samples.
1973    ///
1974    /// # Non-Determinism
1975    /// The output stream is non-deterministic in which elements are sampled, since this
1976    /// is controlled by a clock.
1977    #[cfg(feature = "tokio")]
1978    pub fn sample_every(
1979        self,
1980        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1981        nondet: NonDet,
1982    ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1983    where
1984        L: TopLevel<'a>,
1985    {
1986        let samples = self.location.source_interval(interval);
1987
1988        let tick = self.location.tick();
1989        self.batch(&tick, nondet)
1990            .filter_if(samples.batch(&tick, nondet).first().is_some())
1991            .all_ticks()
1992            .weaken_retries()
1993    }
1994
1995    /// Given a timeout duration, returns an [`Optional`]  which will have a value if the
1996    /// stream has not emitted a value since that duration.
1997    ///
1998    /// # Non-Determinism
1999    /// Timeout relies on non-deterministic sampling of the stream, so depending on when
2000    /// samples take place, timeouts may be non-deterministically generated or missed,
2001    /// and the notification of the timeout may be delayed as well. There is also no
2002    /// guarantee on how long the [`Optional`] will have a value after the timeout is
2003    /// detected based on when the next sample is taken.
2004    #[cfg(feature = "tokio")]
2005    pub fn timeout(
2006        self,
2007        duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2008        nondet: NonDet,
2009    ) -> Optional<(), L::DropConsistency, Unbounded>
2010    where
2011        L: TopLevel<'a>,
2012    {
2013        let tick = self.location.tick();
2014
2015        let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2016            q!(|| None),
2017            q!(
2018                |latest, _| {
2019                    *latest = Some(Instant::now());
2020                },
2021                commutative = manual_proof!(/** TODO */)
2022            ),
2023        );
2024
2025        latest_received
2026            .snapshot(&tick, nondet)
2027            .filter_map(q!(move |latest_received| {
2028                if let Some(latest_received) = latest_received {
2029                    if Instant::now().duration_since(latest_received) > duration {
2030                        Some(())
2031                    } else {
2032                        None
2033                    }
2034                } else {
2035                    Some(())
2036                }
2037            }))
2038            .latest()
2039    }
2040
2041    /// Shifts this stream into an atomic context, which guarantees that any downstream logic
2042    /// will all be executed synchronously before any outputs are yielded (in [`Stream::end_atomic`]).
2043    ///
2044    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
2045    /// processed before an acknowledgement is emitted.
2046    pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2047        let id = self.location.flow_state().borrow_mut().next_clock_id();
2048        let out_location = Atomic {
2049            tick: Tick {
2050                id,
2051                l: self.location.clone(),
2052            },
2053        };
2054        Stream::new(
2055            out_location.clone(),
2056            HydroNode::BeginAtomic {
2057                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2058                metadata: out_location
2059                    .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2060            },
2061        )
2062    }
2063
2064    /// Given a tick, returns a stream corresponding to a batch of elements segmented by
2065    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
2066    /// the order of the input. The output stream will execute in the [`Tick`] that was
2067    /// used to create the atomic section.
2068    ///
2069    /// # Non-Determinism
2070    /// The batch boundaries are non-deterministic and may change across executions.
2071    pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2072        self,
2073        tick: &Tick<L2>,
2074        _nondet: NonDet,
2075    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2076        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2077        Stream::new(
2078            tick.drop_consistency(),
2079            HydroNode::Batch {
2080                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2081                metadata: tick
2082                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2083            },
2084        )
2085    }
2086
2087    /// An operator which allows you to "name" a `HydroNode`.
2088    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
2089    pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2090        {
2091            let mut node = self.ir_node.borrow_mut();
2092            let metadata = node.metadata_mut();
2093            metadata.tag = Some(name.to_owned());
2094        }
2095        self
2096    }
2097
2098    /// Turns this [`Stream`] into a [`Optional`], under the invariant assumption that there is at
2099    /// most one element. If this invariant is broken, the program may exhibit undefined behavior,
2100    /// so uses must be carefully vetted.
2101    pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2102    where
2103        B: IsBounded,
2104    {
2105        Optional::new(
2106            self.location.clone(),
2107            HydroNode::Cast {
2108                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2109                metadata: self
2110                    .location
2111                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2112            },
2113        )
2114    }
2115
2116    pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2117        if O::ORDERING_KIND == O2::ORDERING_KIND {
2118            Stream::new(
2119                self.location.clone(),
2120                self.ir_node.replace(HydroNode::Placeholder),
2121            )
2122        } else {
2123            panic!(
2124                "Runtime ordering {:?} did not match requested cast {:?}.",
2125                O::ORDERING_KIND,
2126                O2::ORDERING_KIND
2127            )
2128        }
2129    }
2130
2131    /// Explicitly "casts" the stream to a type with a different ordering
2132    /// guarantee. Useful in unsafe code where the ordering cannot be proven
2133    /// by the type-system.
2134    ///
2135    /// # Non-Determinism
2136    /// This function is used as an escape hatch, and any mistakes in the
2137    /// provided ordering guarantee will propagate into the guarantees
2138    /// for the rest of the program.
2139    pub fn assume_ordering<O2: Ordering>(
2140        self,
2141        _nondet: NonDet,
2142    ) -> Stream<T, L::DropConsistency, B, O2, R> {
2143        if O::ORDERING_KIND == O2::ORDERING_KIND {
2144            self.use_ordering_type().weaken_consistency()
2145        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2146            // We can always weaken the ordering guarantee
2147            let target_location = self.location().drop_consistency();
2148            Stream::new(
2149                target_location.clone(),
2150                HydroNode::Cast {
2151                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2152                    metadata: target_location
2153                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2154                },
2155            )
2156        } else {
2157            let target_location = self.location().drop_consistency();
2158            Stream::new(
2159                target_location.clone(),
2160                HydroNode::ObserveNonDet {
2161                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2162                    trusted: false,
2163                    metadata: target_location
2164                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2165                },
2166            )
2167        }
2168    }
2169
2170    // like `assume_ordering_trusted`, but only if the input stream is bounded and therefore
2171    // intermediate states will not be revealed
2172    fn assume_ordering_trusted_bounded<O2: Ordering>(
2173        self,
2174        nondet: NonDet,
2175    ) -> Stream<T, L, B, O2, R> {
2176        if B::BOUNDED {
2177            self.assume_ordering_trusted(nondet)
2178        } else {
2179            let self_location = self.location.clone();
2180            let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2181            Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2182        }
2183    }
2184
2185    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2186    // is not observable
2187    pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2188        self,
2189        _nondet: NonDet,
2190    ) -> Stream<T, L, B, O2, R> {
2191        if O::ORDERING_KIND == O2::ORDERING_KIND {
2192            self.use_ordering_type()
2193        } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2194            // We can always weaken the ordering guarantee
2195            Stream::new(
2196                self.location.clone(),
2197                HydroNode::Cast {
2198                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2199                    metadata: self
2200                        .location
2201                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2202                },
2203            )
2204        } else {
2205            Stream::new(
2206                self.location.clone(),
2207                HydroNode::ObserveNonDet {
2208                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2209                    trusted: true,
2210                    metadata: self
2211                        .location
2212                        .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2213                },
2214            )
2215        }
2216    }
2217
2218    #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2219    /// Weakens the ordering guarantee provided by the stream to [`NoOrder`],
2220    /// which is always safe because that is the weakest possible guarantee.
2221    pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2222        self.weaken_ordering::<NoOrder>()
2223    }
2224
2225    /// Weakens the ordering guarantee provided by the stream to `O2`, with the type-system
2226    /// enforcing that `O2` is weaker than the input ordering guarantee.
2227    pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2228        let nondet = nondet!(/** this is a weaker ordering guarantee, so it is safe to assume */);
2229        self.assume_ordering_trusted::<O2>(nondet)
2230    }
2231
2232    /// Strengthens the ordering guarantee to `TotalOrder`, given that `O: IsOrdered`, which
2233    /// implies that `O == TotalOrder`.
2234    pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2235    where
2236        O: IsOrdered,
2237    {
2238        self.assume_ordering_trusted(nondet!(/** no-op */))
2239    }
2240
2241    /// Explicitly "casts" the stream to a type with a different retries
2242    /// guarantee. Useful in unsafe code where the lack of retries cannot
2243    /// be proven by the type-system.
2244    ///
2245    /// # Non-Determinism
2246    /// This function is used as an escape hatch, and any mistakes in the
2247    /// provided retries guarantee will propagate into the guarantees
2248    /// for the rest of the program.
2249    pub fn assume_retries<R2: Retries>(
2250        self,
2251        _nondet: NonDet,
2252    ) -> Stream<T, L::DropConsistency, B, O, R2> {
2253        if R::RETRIES_KIND == R2::RETRIES_KIND {
2254            Stream::new(
2255                self.location.drop_consistency(),
2256                self.ir_node.replace(HydroNode::Placeholder),
2257            )
2258        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2259            // We can always weaken the retries guarantee
2260            let target_location = self.location.drop_consistency();
2261            Stream::new(
2262                target_location.clone(),
2263                HydroNode::Cast {
2264                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2265                    metadata: target_location
2266                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2267                },
2268            )
2269        } else {
2270            let target_location = self.location.drop_consistency();
2271            Stream::new(
2272                target_location.clone(),
2273                HydroNode::ObserveNonDet {
2274                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2275                    trusted: false,
2276                    metadata: target_location
2277                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2278                },
2279            )
2280        }
2281    }
2282
2283    // only for internal APIs that have been carefully vetted to ensure that the non-determinism
2284    // is not observable
2285    fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2286        if R::RETRIES_KIND == R2::RETRIES_KIND {
2287            Stream::new(
2288                self.location.clone(),
2289                self.ir_node.replace(HydroNode::Placeholder),
2290            )
2291        } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2292            // We can always weaken the retries guarantee
2293            Stream::new(
2294                self.location.clone(),
2295                HydroNode::Cast {
2296                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2297                    metadata: self
2298                        .location
2299                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2300                },
2301            )
2302        } else {
2303            Stream::new(
2304                self.location.clone(),
2305                HydroNode::ObserveNonDet {
2306                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2307                    trusted: true,
2308                    metadata: self
2309                        .location
2310                        .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2311                },
2312            )
2313        }
2314    }
2315
2316    #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2317    /// Weakens the retries guarantee provided by the stream to [`AtLeastOnce`],
2318    /// which is always safe because that is the weakest possible guarantee.
2319    pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2320        self.weaken_retries::<AtLeastOnce>()
2321    }
2322
2323    /// Weakens the retries guarantee provided by the stream to `R2`, with the type-system
2324    /// enforcing that `R2` is weaker than the input retries guarantee.
2325    pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2326        let nondet = nondet!(/** this is a weaker retry guarantee, so it is safe to assume */);
2327        self.assume_retries_trusted::<R2>(nondet)
2328    }
2329
2330    /// Strengthens the retry guarantee to `ExactlyOnce`, given that `R: IsExactlyOnce`, which
2331    /// implies that `R == ExactlyOnce`.
2332    pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2333    where
2334        R: IsExactlyOnce,
2335    {
2336        self.assume_retries_trusted(nondet!(/** no-op */))
2337    }
2338
2339    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
2340    /// implies that `B == Bounded`.
2341    pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2342    where
2343        B: IsBounded,
2344    {
2345        self.weaken_boundedness()
2346    }
2347
2348    /// Weakens the boundedness guarantee to an arbitrary boundedness `B2`, given that `B: IsBounded`,
2349    /// which implies that `B == Bounded`.
2350    pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2351        if B::BOUNDED == B2::BOUNDED {
2352            Stream::new(
2353                self.location.clone(),
2354                self.ir_node.replace(HydroNode::Placeholder),
2355            )
2356        } else {
2357            // We can always weaken the boundedness
2358            Stream::new(
2359                self.location.clone(),
2360                HydroNode::Cast {
2361                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2362                    metadata: self
2363                        .location
2364                        .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2365                },
2366            )
2367        }
2368    }
2369}
2370
2371impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2372where
2373    L: Location<'a>,
2374{
2375    /// Clone each element of the stream; akin to `map(q!(|d| d.clone()))`.
2376    ///
2377    /// # Example
2378    /// ```rust
2379    /// # #[cfg(feature = "deploy")] {
2380    /// # use hydro_lang::prelude::*;
2381    /// # use futures::StreamExt;
2382    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2383    /// process.source_iter(q!(&[1, 2, 3])).cloned()
2384    /// # }, |mut stream| async move {
2385    /// // 1, 2, 3
2386    /// # for w in vec![1, 2, 3] {
2387    /// #     assert_eq!(stream.next().await.unwrap(), w);
2388    /// # }
2389    /// # }));
2390    /// # }
2391    /// ```
2392    pub fn cloned(self) -> Stream<T, L, B, O, R>
2393    where
2394        T: Clone,
2395    {
2396        self.map(q!(|d| d.clone()))
2397    }
2398}
2399
2400impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2401where
2402    L: Location<'a>,
2403{
2404    /// Computes the number of elements in the stream as a [`Singleton`].
2405    ///
2406    /// # Example
2407    /// ```rust
2408    /// # #[cfg(feature = "deploy")] {
2409    /// # use hydro_lang::prelude::*;
2410    /// # use futures::StreamExt;
2411    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2412    /// let tick = process.tick();
2413    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2414    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2415    /// batch.count().all_ticks()
2416    /// # }, |mut stream| async move {
2417    /// // 4
2418    /// # assert_eq!(stream.next().await.unwrap(), 4);
2419    /// # }));
2420    /// # }
2421    /// ```
2422    pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2423        self.assume_ordering_trusted::<TotalOrder>(nondet!(
2424            /// Order does not affect eventual count, and also does not affect intermediate states.
2425        ))
2426        .fold(
2427            q!(|| 0usize),
2428            q!(
2429                |count, _| *count += 1,
2430                monotone = manual_proof!(/** += 1 is monotone */)
2431            ),
2432        )
2433    }
2434}
2435
2436impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2437    /// Produces a new stream that merges the elements of the two input streams.
2438    /// The result has [`NoOrder`] because the order of merging is not guaranteed.
2439    ///
2440    /// Currently, both input streams must be [`Unbounded`]. When the streams are
2441    /// [`Bounded`], you can use [`Stream::chain`] instead.
2442    ///
2443    /// # Example
2444    /// ```rust
2445    /// # #[cfg(feature = "deploy")] {
2446    /// # use hydro_lang::prelude::*;
2447    /// # use futures::StreamExt;
2448    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2449    /// let numbers: Stream<i32, _, Unbounded> = // 1, 2, 3, 4
2450    /// # process.source_iter(q!(vec![1, 2, 3, 4])).into();
2451    /// numbers.clone().map(q!(|x| x + 1)).merge_unordered(numbers)
2452    /// # }, |mut stream| async move {
2453    /// // 2, 3, 4, 5, and 1, 2, 3, 4 merged in unknown order
2454    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2455    /// #     assert_eq!(stream.next().await.unwrap(), w);
2456    /// # }
2457    /// # }));
2458    /// # }
2459    /// ```
2460    pub fn merge_unordered<O2: Ordering, R2: Retries>(
2461        self,
2462        other: Stream<T, L, Unbounded, O2, R2>,
2463    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2464    where
2465        R: MinRetries<R2>,
2466    {
2467        Stream::new(
2468            self.location.clone(),
2469            HydroNode::Chain {
2470                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2471                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2472                metadata: self.location.new_node_metadata(Stream::<
2473                    T,
2474                    L,
2475                    Unbounded,
2476                    NoOrder,
2477                    <R as MinRetries<R2>>::Min,
2478                >::collection_kind()),
2479            },
2480        )
2481    }
2482
2483    /// Deprecated: use [`Stream::merge_unordered`] instead.
2484    #[deprecated(note = "use `merge_unordered` instead")]
2485    pub fn interleave<O2: Ordering, R2: Retries>(
2486        self,
2487        other: Stream<T, L, Unbounded, O2, R2>,
2488    ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2489    where
2490        R: MinRetries<R2>,
2491    {
2492        self.merge_unordered(other)
2493    }
2494}
2495
2496impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2497    /// Produces a new stream that combines the elements of the two input streams,
2498    /// preserving the relative order of elements within each input.
2499    ///
2500    /// # Non-Determinism
2501    /// The order in which elements *across* the two streams will be interleaved is
2502    /// non-deterministic, so the order of elements will vary across runs. If the output
2503    /// order is irrelevant, use [`Stream::merge_unordered`] instead, which is deterministic
2504    /// but emits an unordered stream. For deterministic first-then-second ordering on
2505    /// bounded streams, use [`Stream::chain`].
2506    ///
2507    /// # Example
2508    /// ```rust
2509    /// # #[cfg(feature = "deploy")] {
2510    /// # use hydro_lang::prelude::*;
2511    /// # use futures::StreamExt;
2512    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2513    /// let numbers: Stream<i32, _, Unbounded> = // 1, 3
2514    /// # process.source_iter(q!(vec![1, 3])).into();
2515    /// numbers.clone().merge_ordered(numbers.map(q!(|x| x + 1)), nondet!(/** example */))
2516    /// # }, |mut stream| async move {
2517    /// // 1, 3 and 2, 4 in some order, preserving the original local order
2518    /// # for w in vec![1, 3, 2, 4] {
2519    /// #     assert_eq!(stream.next().await.unwrap(), w);
2520    /// # }
2521    /// # }));
2522    /// # }
2523    /// ```
2524    pub fn merge_ordered<R2: Retries>(
2525        self,
2526        other: Stream<T, L, B, TotalOrder, R2>,
2527        _nondet: NonDet,
2528    ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2529    where
2530        R: MinRetries<R2>,
2531    {
2532        let target_location = self.location().drop_consistency();
2533        Stream::new(
2534            target_location.clone(),
2535            HydroNode::MergeOrdered {
2536                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2537                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2538                metadata: target_location.new_node_metadata(Stream::<
2539                    T,
2540                    L::DropConsistency,
2541                    B,
2542                    TotalOrder,
2543                    <R as MinRetries<R2>>::Min,
2544                >::collection_kind()),
2545            },
2546        )
2547    }
2548}
2549
2550impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2551where
2552    L: Location<'a>,
2553{
2554    /// Produces a new stream that emits the input elements in sorted order.
2555    ///
2556    /// The input stream can have any ordering guarantee, but the output stream
2557    /// will have a [`TotalOrder`] guarantee. This operator will block until all
2558    /// elements in the input stream are available, so it requires the input stream
2559    /// to be [`Bounded`].
2560    ///
2561    /// # Example
2562    /// ```rust
2563    /// # #[cfg(feature = "deploy")] {
2564    /// # use hydro_lang::prelude::*;
2565    /// # use futures::StreamExt;
2566    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2567    /// let tick = process.tick();
2568    /// let numbers = process.source_iter(q!(vec![4, 2, 3, 1]));
2569    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2570    /// batch.sort().all_ticks()
2571    /// # }, |mut stream| async move {
2572    /// // 1, 2, 3, 4
2573    /// # for w in (1..5) {
2574    /// #     assert_eq!(stream.next().await.unwrap(), w);
2575    /// # }
2576    /// # }));
2577    /// # }
2578    /// ```
2579    pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2580    where
2581        B: IsBounded,
2582        T: Ord,
2583    {
2584        let this = self.make_bounded();
2585        Stream::new(
2586            this.location.clone(),
2587            HydroNode::Sort {
2588                input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2589                metadata: this
2590                    .location
2591                    .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2592            },
2593        )
2594    }
2595
2596    /// Produces a new stream that first emits the elements of the `self` stream,
2597    /// and then emits the elements of the `other` stream. The output stream has
2598    /// a [`TotalOrder`] guarantee if and only if both input streams have a
2599    /// [`TotalOrder`] guarantee.
2600    ///
2601    /// Currently, both input streams must be [`Bounded`]. This operator will block
2602    /// on the first stream until all its elements are available. In a future version,
2603    /// we will relax the requirement on the `other` stream.
2604    ///
2605    /// # Example
2606    /// ```rust
2607    /// # #[cfg(feature = "deploy")] {
2608    /// # use hydro_lang::prelude::*;
2609    /// # use futures::StreamExt;
2610    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2611    /// let tick = process.tick();
2612    /// let numbers = process.source_iter(q!(vec![1, 2, 3, 4]));
2613    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2614    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
2615    /// # }, |mut stream| async move {
2616    /// // 2, 3, 4, 5, 1, 2, 3, 4
2617    /// # for w in vec![2, 3, 4, 5, 1, 2, 3, 4] {
2618    /// #     assert_eq!(stream.next().await.unwrap(), w);
2619    /// # }
2620    /// # }));
2621    /// # }
2622    /// ```
2623    pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2624        self,
2625        other: Stream<T, L, B2, O2, R2>,
2626    ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2627    where
2628        B: IsBounded,
2629        O: MinOrder<O2>,
2630        R: MinRetries<R2>,
2631    {
2632        check_matching_location(&self.location, &other.location);
2633
2634        Stream::new(
2635            self.location.clone(),
2636            HydroNode::Chain {
2637                first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2638                second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2639                metadata: self.location.new_node_metadata(Stream::<
2640                    T,
2641                    L,
2642                    B2,
2643                    <O as MinOrder<O2>>::Min,
2644                    <R as MinRetries<R2>>::Min,
2645                >::collection_kind()),
2646            },
2647        )
2648    }
2649
2650    /// Forms the cross-product (Cartesian product, cross-join) of the items in the 2 input streams.
2651    /// Unlike [`Stream::cross_product`], the output order is totally ordered when the inputs are
2652    /// because this is compiled into a nested loop.
2653    pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2654        self,
2655        other: Stream<T2, L, Bounded, O2, R2>,
2656    ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2657    where
2658        B: IsBounded,
2659        T: Clone,
2660        T2: Clone,
2661        R: MinRetries<R2>,
2662    {
2663        let this = self.make_bounded();
2664        check_matching_location(&this.location, &other.location);
2665
2666        Stream::new(
2667            this.location.clone(),
2668            HydroNode::CrossProduct {
2669                left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2670                right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2671                metadata: this.location.new_node_metadata(Stream::<
2672                    (T, T2),
2673                    L,
2674                    Bounded,
2675                    <O2 as MinOrder<O>>::Min,
2676                    <R as MinRetries<R2>>::Min,
2677                >::collection_kind()),
2678            },
2679        )
2680    }
2681
2682    /// Creates a [`KeyedStream`] with the same set of keys as `keys`, but with the elements in
2683    /// `self` used as the values for *each* key.
2684    ///
2685    /// This is helpful when "broadcasting" a set of values so that all the keys have the same
2686    /// values. For example, it can be used to send the same set of elements to several cluster
2687    /// members, if the membership information is available as a [`KeyedSingleton`].
2688    ///
2689    /// # Example
2690    /// ```rust
2691    /// # #[cfg(feature = "deploy")] {
2692    /// # use hydro_lang::prelude::*;
2693    /// # use futures::StreamExt;
2694    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2695    /// # let tick = process.tick();
2696    /// let keyed_singleton = // { 1: (), 2: () }
2697    /// # process
2698    /// #     .source_iter(q!(vec![(1, ()), (2, ())]))
2699    /// #     .into_keyed()
2700    /// #     .batch(&tick, nondet!(/** test */))
2701    /// #     .first();
2702    /// let stream = // [ "a", "b" ]
2703    /// # process
2704    /// #     .source_iter(q!(vec!["a".to_owned(), "b".to_owned()]))
2705    /// #     .batch(&tick, nondet!(/** test */));
2706    /// stream.repeat_with_keys(keyed_singleton)
2707    /// # .entries().all_ticks()
2708    /// # }, |mut stream| async move {
2709    /// // { 1: ["a", "b" ], 2: ["a", "b"] }
2710    /// # let mut results = Vec::new();
2711    /// # for _ in 0..4 {
2712    /// #     results.push(stream.next().await.unwrap());
2713    /// # }
2714    /// # results.sort();
2715    /// # assert_eq!(results, vec![(1, "a".to_owned()), (1, "b".to_owned()), (2, "a".to_owned()), (2, "b".to_owned())]);
2716    /// # }));
2717    /// # }
2718    /// ```
2719    pub fn repeat_with_keys<K, V2>(
2720        self,
2721        keys: KeyedSingleton<K, V2, L, Bounded>,
2722    ) -> KeyedStream<K, T, L, Bounded, O, R>
2723    where
2724        B: IsBounded,
2725        K: Clone,
2726        T: Clone,
2727    {
2728        keys.keys()
2729            .assume_ordering_trusted::<TotalOrder>(
2730                nondet!(/** keyed stream does not depend on ordering of keys */),
2731            )
2732            .cross_product_nested_loop(self.make_bounded())
2733            .into_keyed()
2734    }
2735
2736    /// Consumes a stream of `Future<T>`, resolving each future while blocking subgraph
2737    /// execution until all results are available. The output order is based on when futures
2738    /// complete, and may be different than the input order.
2739    ///
2740    /// Unlike [`Stream::resolve_futures`], which allows the subgraph to continue executing
2741    /// while futures are pending, this variant blocks until the futures resolve.
2742    ///
2743    /// # Example
2744    /// ```rust
2745    /// # #[cfg(feature = "deploy")] {
2746    /// # use std::collections::HashSet;
2747    /// # use futures::StreamExt;
2748    /// # use hydro_lang::prelude::*;
2749    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2750    /// process
2751    ///     .source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
2752    ///     .map(q!(|x| async move {
2753    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
2754    ///         x
2755    ///     }))
2756    ///     .resolve_futures_blocking()
2757    /// #   },
2758    /// #   |mut stream| async move {
2759    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
2760    /// #       let mut output = HashSet::new();
2761    /// #       for _ in 1..10 {
2762    /// #           output.insert(stream.next().await.unwrap());
2763    /// #       }
2764    /// #       assert_eq!(
2765    /// #           output,
2766    /// #           HashSet::<i32>::from_iter(1..10)
2767    /// #       );
2768    /// #   },
2769    /// # ));
2770    /// # }
2771    /// ```
2772    pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2773    where
2774        T: Future,
2775    {
2776        Stream::new(
2777            self.location.clone(),
2778            HydroNode::ResolveFuturesBlocking {
2779                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2780                metadata: self
2781                    .location
2782                    .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2783            },
2784        )
2785    }
2786
2787    /// Returns a [`Singleton`] containing `true` if the stream has no elements, or `false` otherwise.
2788    ///
2789    /// # Example
2790    /// ```rust
2791    /// # #[cfg(feature = "deploy")] {
2792    /// # use hydro_lang::prelude::*;
2793    /// # use futures::StreamExt;
2794    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2795    /// let tick = process.tick();
2796    /// let empty: Stream<i32, _, Bounded> = process
2797    ///   .source_iter(q!(Vec::<i32>::new()))
2798    ///   .batch(&tick, nondet!(/** test */));
2799    /// empty.is_empty().all_ticks()
2800    /// # }, |mut stream| async move {
2801    /// // true
2802    /// # assert_eq!(stream.next().await.unwrap(), true);
2803    /// # }));
2804    /// # }
2805    /// ```
2806    #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2807    pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2808    where
2809        B: IsBounded,
2810    {
2811        self.make_bounded()
2812            .assume_ordering_trusted::<TotalOrder>(
2813                nondet!(/** is_empty intermediates unaffected by order */),
2814            )
2815            .first()
2816            .is_none()
2817    }
2818}
2819
2820impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2821where
2822    L: Location<'a>,
2823{
2824    /// Given two streams of pairs `(K, V1)` and `(K, V2)`, produces a new stream of nested pairs `(K, (V1, V2))`
2825    /// by equi-joining the two streams on the key attribute `K`.
2826    ///
2827    /// When the right-hand side is [`Bounded`], the join accumulates the right side first
2828    /// and streams the left side through, preserving the left side's ordering. When both
2829    /// sides are [`Unbounded`], a symmetric hash join is used and ordering is [`NoOrder`].
2830    ///
2831    /// # Example
2832    /// ```rust
2833    /// # #[cfg(feature = "deploy")] {
2834    /// # use hydro_lang::prelude::*;
2835    /// # use std::collections::HashSet;
2836    /// # use futures::StreamExt;
2837    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2838    /// let tick = process.tick();
2839    /// let stream1 = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
2840    /// let stream2 = process.source_iter(q!(vec![(1, 'x'), (2, 'y')]));
2841    /// stream1.join(stream2)
2842    /// # }, |mut stream| async move {
2843    /// // (1, ('a', 'x')), (2, ('b', 'y'))
2844    /// # let expected = HashSet::from([(1, ('a', 'x')), (2, ('b', 'y'))]);
2845    /// # stream.map(|i| assert!(expected.contains(&i)));
2846    /// # }));
2847    /// # }
2848    pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2849        self,
2850        n: Stream<(K, V2), L, B2, O2, R2>,
2851    ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2852    where
2853        K: Eq + Hash + Clone,
2854        R: MinRetries<R2>,
2855        V1: Clone,
2856        V2: Clone,
2857    {
2858        check_matching_location(&self.location, &n.location);
2859
2860        let ir_node = if B2::BOUNDED {
2861            HydroNode::JoinHalf {
2862                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2863                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2864                metadata: self.location.new_node_metadata(Stream::<
2865                    (K, (V1, V2)),
2866                    L,
2867                    B,
2868                    B2::PreserveOrderIfBounded<O>,
2869                    <R as MinRetries<R2>>::Min,
2870                >::collection_kind()),
2871            }
2872        } else {
2873            HydroNode::Join {
2874                left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2875                right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2876                metadata: self.location.new_node_metadata(Stream::<
2877                    (K, (V1, V2)),
2878                    L,
2879                    B,
2880                    B2::PreserveOrderIfBounded<O>,
2881                    <R as MinRetries<R2>>::Min,
2882                >::collection_kind()),
2883            }
2884        };
2885
2886        Stream::new(self.location.clone(), ir_node)
2887    }
2888
2889    /// Given a stream of pairs `(K, V1)` and a bounded stream of keys `K`,
2890    /// computes the anti-join of the items in the input -- i.e. returns
2891    /// unique items in the first input that do not have a matching key
2892    /// in the second input.
2893    ///
2894    /// # Example
2895    /// ```rust
2896    /// # #[cfg(feature = "deploy")] {
2897    /// # use hydro_lang::prelude::*;
2898    /// # use futures::StreamExt;
2899    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2900    /// let tick = process.tick();
2901    /// let stream = process
2902    ///   .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
2903    ///   .batch(&tick, nondet!(/** test */));
2904    /// let batch = process
2905    ///   .source_iter(q!(vec![1, 2]))
2906    ///   .batch(&tick, nondet!(/** test */));
2907    /// stream.anti_join(batch).all_ticks()
2908    /// # }, |mut stream| async move {
2909    /// # for w in vec![(3, 'c'), (4, 'd')] {
2910    /// #     assert_eq!(stream.next().await.unwrap(), w);
2911    /// # }
2912    /// # }));
2913    /// # }
2914    pub fn anti_join<O2: Ordering, R2: Retries>(
2915        self,
2916        n: Stream<K, L, Bounded, O2, R2>,
2917    ) -> Stream<(K, V1), L, B, O, R>
2918    where
2919        K: Eq + Hash,
2920    {
2921        check_matching_location(&self.location, &n.location);
2922
2923        Stream::new(
2924            self.location.clone(),
2925            HydroNode::AntiJoin {
2926                pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2927                neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2928                metadata: self
2929                    .location
2930                    .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2931            },
2932        )
2933    }
2934}
2935
2936impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2937    Stream<(K, V), L, B, O, R>
2938{
2939    /// Transforms this stream into a [`KeyedStream`], where the first element of each tuple
2940    /// is used as the key and the second element is added to the entries associated with that key.
2941    ///
2942    /// Because [`KeyedStream`] lazily groups values into buckets, this operator has zero computational
2943    /// cost and _does not_ require that the key type is hashable. Keyed streams are useful for
2944    /// performing grouped aggregations, but also for more precise ordering guarantees such as
2945    /// total ordering _within_ each group but no ordering _across_ groups.
2946    ///
2947    /// # Example
2948    /// ```rust
2949    /// # #[cfg(feature = "deploy")] {
2950    /// # use hydro_lang::prelude::*;
2951    /// # use futures::StreamExt;
2952    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2953    /// process
2954    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
2955    ///     .into_keyed()
2956    /// #   .entries()
2957    /// # }, |mut stream| async move {
2958    /// // { 1: [2, 3], 2: [4] }
2959    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
2960    /// #     assert_eq!(stream.next().await.unwrap(), w);
2961    /// # }
2962    /// # }));
2963    /// # }
2964    /// ```
2965    pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2966        KeyedStream::new(
2967            self.location.clone(),
2968            HydroNode::Cast {
2969                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2970                metadata: self
2971                    .location
2972                    .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2973            },
2974        )
2975    }
2976}
2977
2978impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2979where
2980    K: Eq + Hash,
2981    L: Location<'a>,
2982{
2983    /// Given a stream of pairs `(K, V)`, produces a new stream of unique keys `K`.
2984    /// # Example
2985    /// ```rust
2986    /// # #[cfg(feature = "deploy")] {
2987    /// # use hydro_lang::prelude::*;
2988    /// # use futures::StreamExt;
2989    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
2990    /// let tick = process.tick();
2991    /// let numbers = process.source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]));
2992    /// let batch = numbers.batch(&tick, nondet!(/** test */));
2993    /// batch.keys().all_ticks()
2994    /// # }, |mut stream| async move {
2995    /// // 1, 2
2996    /// # assert_eq!(stream.next().await.unwrap(), 1);
2997    /// # assert_eq!(stream.next().await.unwrap(), 2);
2998    /// # }));
2999    /// # }
3000    /// ```
3001    pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
3002        self.into_keyed()
3003            .fold(
3004                q!(|| ()),
3005                q!(
3006                    |_, _| {},
3007                    commutative = manual_proof!(/** values are ignored */),
3008                    idempotent = manual_proof!(/** values are ignored */)
3009                ),
3010            )
3011            .keys()
3012    }
3013}
3014
3015impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3016where
3017    L: Location<'a>,
3018{
3019    /// Returns a stream corresponding to the latest batch of elements being atomically
3020    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
3021    /// the order of the input.
3022    ///
3023    /// # Non-Determinism
3024    /// The batch boundaries are non-deterministic and may change across executions.
3025    pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3026        self,
3027        tick: &Tick<L2>,
3028        _nondet: NonDet,
3029    ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3030        Stream::new(
3031            tick.drop_consistency(),
3032            HydroNode::Batch {
3033                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3034                metadata: tick
3035                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3036            },
3037        )
3038    }
3039
3040    /// Yields the elements of this stream back into a top-level, asynchronous execution context.
3041    /// See [`Stream::atomic`] for more details.
3042    pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3043        Stream::new(
3044            self.location.tick.l.clone(),
3045            HydroNode::EndAtomic {
3046                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3047                metadata: self
3048                    .location
3049                    .tick
3050                    .l
3051                    .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3052            },
3053        )
3054    }
3055}
3056
3057impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3058where
3059    L: TopLevel<'a>,
3060    F: Future<Output = T>,
3061{
3062    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3063    /// Future outputs are produced as available, regardless of input arrival order.
3064    ///
3065    /// # Example
3066    /// ```rust
3067    /// # #[cfg(feature = "deploy")] {
3068    /// # use std::collections::HashSet;
3069    /// # use futures::StreamExt;
3070    /// # use hydro_lang::prelude::*;
3071    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3072    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3073    ///     .map(q!(|x| async move {
3074    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3075    ///         x
3076    ///     }))
3077    ///     .resolve_futures()
3078    /// #   },
3079    /// #   |mut stream| async move {
3080    /// // 1, 2, 3, 4, 5, 6, 7, 8, 9 (in any order)
3081    /// #       let mut output = HashSet::new();
3082    /// #       for _ in 1..10 {
3083    /// #           output.insert(stream.next().await.unwrap());
3084    /// #       }
3085    /// #       assert_eq!(
3086    /// #           output,
3087    /// #           HashSet::<i32>::from_iter(1..10)
3088    /// #       );
3089    /// #   },
3090    /// # ));
3091    /// # }
3092    pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3093        Stream::new(
3094            self.location.clone(),
3095            HydroNode::ResolveFutures {
3096                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3097                metadata: self
3098                    .location
3099                    .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3100            },
3101        )
3102    }
3103
3104    /// Consumes a stream of `Future<T>`, produces a new stream of the resulting `T` outputs.
3105    /// Future outputs are produced in the same order as the input stream.
3106    ///
3107    /// # Example
3108    /// ```rust
3109    /// # #[cfg(feature = "deploy")] {
3110    /// # use std::collections::HashSet;
3111    /// # use futures::StreamExt;
3112    /// # use hydro_lang::prelude::*;
3113    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3114    /// process.source_iter(q!([2, 3, 1, 9, 6, 5, 4, 7, 8]))
3115    ///     .map(q!(|x| async move {
3116    ///         tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
3117    ///         x
3118    ///     }))
3119    ///     .resolve_futures_ordered()
3120    /// #   },
3121    /// #   |mut stream| async move {
3122    /// // 2, 3, 1, 9, 6, 5, 4, 7, 8
3123    /// #       let mut output = Vec::new();
3124    /// #       for _ in 1..10 {
3125    /// #           output.push(stream.next().await.unwrap());
3126    /// #       }
3127    /// #       assert_eq!(
3128    /// #           output,
3129    /// #           vec![2, 3, 1, 9, 6, 5, 4, 7, 8]
3130    /// #       );
3131    /// #   },
3132    /// # ));
3133    /// # }
3134    pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3135        Stream::new(
3136            self.location.clone(),
3137            HydroNode::ResolveFuturesOrdered {
3138                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3139                metadata: self
3140                    .location
3141                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3142            },
3143        )
3144    }
3145}
3146
3147impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3148where
3149    L: Location<'a>,
3150{
3151    /// Asynchronously yields this batch of elements outside the tick as an unbounded stream,
3152    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3153    pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3154        Stream::new(
3155            self.location.outer().clone(),
3156            HydroNode::YieldConcat {
3157                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3158                metadata: self
3159                    .location
3160                    .outer()
3161                    .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3162            },
3163        )
3164    }
3165
3166    /// Synchronously yields this batch of elements outside the tick as an unbounded stream,
3167    /// which will stream all the elements across _all_ tick iterations by concatenating the batches.
3168    ///
3169    /// Unlike [`Stream::all_ticks`], this preserves synchronous execution, as the output stream
3170    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
3171    /// stream's [`Tick`] context.
3172    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3173        let out_location = Atomic {
3174            tick: self.location.clone(),
3175        };
3176
3177        Stream::new(
3178            out_location.clone(),
3179            HydroNode::YieldConcat {
3180                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3181                metadata: out_location
3182                    .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3183            },
3184        )
3185    }
3186
3187    /// Transforms the stream using the given closure in "stateful" mode, where stateful operators
3188    /// such as `fold` retrain their memory across ticks rather than resetting across batches of
3189    /// input.
3190    ///
3191    /// This API is particularly useful for stateful computation on batches of data, such as
3192    /// maintaining an accumulated state that is up to date with the current batch.
3193    ///
3194    /// # Example
3195    /// ```rust
3196    /// # #[cfg(feature = "deploy")] {
3197    /// # use hydro_lang::prelude::*;
3198    /// # use futures::StreamExt;
3199    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3200    /// let tick = process.tick();
3201    /// # // ticks are lazy by default, forces the second tick to run
3202    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3203    /// # let batch_first_tick = process
3204    /// #   .source_iter(q!(vec![1, 2, 3, 4]))
3205    /// #  .batch(&tick, nondet!(/** test */));
3206    /// # let batch_second_tick = process
3207    /// #   .source_iter(q!(vec![5, 6, 7]))
3208    /// #   .batch(&tick, nondet!(/** test */))
3209    /// #   .defer_tick(); // appears on the second tick
3210    /// let input = // [1, 2, 3, 4 (first batch), 5, 6, 7 (second batch)]
3211    /// # batch_first_tick.chain(batch_second_tick).all_ticks();
3212    ///
3213    /// input.batch(&tick, nondet!(/** test */))
3214    ///     .across_ticks(|s| s.count()).all_ticks()
3215    /// # }, |mut stream| async move {
3216    /// // [4, 7]
3217    /// assert_eq!(stream.next().await.unwrap(), 4);
3218    /// assert_eq!(stream.next().await.unwrap(), 7);
3219    /// # }));
3220    /// # }
3221    /// ```
3222    pub fn across_ticks<Out: BatchAtomic<'a>>(
3223        self,
3224        thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3225    ) -> Out::Batched {
3226        thunk(self.all_ticks_atomic()).batched_atomic()
3227    }
3228
3229    /// Shifts the elements in `self` to the **next tick**, so that the returned stream at tick `T`
3230    /// always has the elements of `self` at tick `T - 1`.
3231    ///
3232    /// At tick `0`, the output stream is empty, since there is no previous tick.
3233    ///
3234    /// This operator enables stateful iterative processing with ticks, by sending data from one
3235    /// tick to the next. For example, you can use it to compare inputs across consecutive batches.
3236    ///
3237    /// # Example
3238    /// ```rust
3239    /// # #[cfg(feature = "deploy")] {
3240    /// # use hydro_lang::prelude::*;
3241    /// # use futures::StreamExt;
3242    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
3243    /// let tick = process.tick();
3244    /// // ticks are lazy by default, forces the second tick to run
3245    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
3246    ///
3247    /// let batch_first_tick = process
3248    ///   .source_iter(q!(vec![1, 2, 3, 4]))
3249    ///   .batch(&tick, nondet!(/** test */));
3250    /// let batch_second_tick = process
3251    ///   .source_iter(q!(vec![0, 3, 4, 5, 6]))
3252    ///   .batch(&tick, nondet!(/** test */))
3253    ///   .defer_tick(); // appears on the second tick
3254    /// let changes_across_ticks = batch_first_tick.chain(batch_second_tick);
3255    ///
3256    /// changes_across_ticks.clone().filter_not_in(
3257    ///     changes_across_ticks.defer_tick() // the elements from the previous tick
3258    /// ).all_ticks()
3259    /// # }, |mut stream| async move {
3260    /// // [1, 2, 3, 4 /* first tick */, 0, 5, 6 /* second tick */]
3261    /// # for w in vec![1, 2, 3, 4, 0, 5, 6] {
3262    /// #     assert_eq!(stream.next().await.unwrap(), w);
3263    /// # }
3264    /// # }));
3265    /// # }
3266    /// ```
3267    pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3268        Stream::new(
3269            self.location.clone(),
3270            HydroNode::DeferTick {
3271                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3272                metadata: self
3273                    .location
3274                    .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3275            },
3276        )
3277    }
3278}
3279
3280#[cfg(test)]
3281mod tests {
3282    #[cfg(feature = "deploy")]
3283    use futures::{SinkExt, StreamExt};
3284    #[cfg(feature = "deploy")]
3285    use hydro_deploy::Deployment;
3286    #[cfg(feature = "deploy")]
3287    use serde::{Deserialize, Serialize};
3288    #[cfg(any(feature = "deploy", feature = "sim"))]
3289    use stageleft::q;
3290
3291    #[cfg(any(feature = "deploy", feature = "sim"))]
3292    use crate::compile::builder::FlowBuilder;
3293    #[cfg(feature = "deploy")]
3294    use crate::live_collections::sliced::sliced;
3295    #[cfg(feature = "deploy")]
3296    use crate::live_collections::stream::ExactlyOnce;
3297    #[cfg(feature = "sim")]
3298    use crate::live_collections::stream::NoOrder;
3299    #[cfg(any(feature = "deploy", feature = "sim"))]
3300    use crate::live_collections::stream::TotalOrder;
3301    #[cfg(any(feature = "deploy", feature = "sim"))]
3302    use crate::location::Location;
3303    #[cfg(feature = "sim")]
3304    use crate::networking::TCP;
3305    #[cfg(any(feature = "deploy", feature = "sim"))]
3306    use crate::nondet::nondet;
3307
3308    mod backtrace_chained_ops;
3309
3310    #[cfg(feature = "deploy")]
3311    struct P1 {}
3312    #[cfg(feature = "deploy")]
3313    struct P2 {}
3314
3315    #[cfg(feature = "deploy")]
3316    #[derive(Serialize, Deserialize, Debug)]
3317    struct SendOverNetwork {
3318        n: u32,
3319    }
3320
3321    #[cfg(feature = "deploy")]
3322    #[tokio::test]
3323    async fn first_ten_distributed() {
3324        use crate::networking::TCP;
3325
3326        let mut deployment = Deployment::new();
3327
3328        let mut flow = FlowBuilder::new();
3329        let first_node = flow.process::<P1>();
3330        let second_node = flow.process::<P2>();
3331        let external = flow.external::<P2>();
3332
3333        let numbers = first_node.source_iter(q!(0..10));
3334        let out_port = numbers
3335            .map(q!(|n| SendOverNetwork { n }))
3336            .send(&second_node, TCP.fail_stop().bincode())
3337            .send_bincode_external(&external);
3338
3339        let nodes = flow
3340            .with_process(&first_node, deployment.Localhost())
3341            .with_process(&second_node, deployment.Localhost())
3342            .with_external(&external, deployment.Localhost())
3343            .deploy(&mut deployment);
3344
3345        deployment.deploy().await.unwrap();
3346
3347        let mut external_out = nodes.connect(out_port).await;
3348
3349        deployment.start().await.unwrap();
3350
3351        for i in 0..10 {
3352            assert_eq!(external_out.next().await.unwrap().n, i);
3353        }
3354    }
3355
3356    #[cfg(feature = "deploy")]
3357    #[tokio::test]
3358    async fn first_cardinality() {
3359        let mut deployment = Deployment::new();
3360
3361        let mut flow = FlowBuilder::new();
3362        let node = flow.process::<()>();
3363        let external = flow.external::<()>();
3364
3365        let node_tick = node.tick();
3366        let count = node_tick
3367            .singleton(q!([1, 2, 3]))
3368            .into_stream()
3369            .flatten_ordered()
3370            .first()
3371            .into_stream()
3372            .count()
3373            .all_ticks()
3374            .send_bincode_external(&external);
3375
3376        let nodes = flow
3377            .with_process(&node, deployment.Localhost())
3378            .with_external(&external, deployment.Localhost())
3379            .deploy(&mut deployment);
3380
3381        deployment.deploy().await.unwrap();
3382
3383        let mut external_out = nodes.connect(count).await;
3384
3385        deployment.start().await.unwrap();
3386
3387        assert_eq!(external_out.next().await.unwrap(), 1);
3388    }
3389
3390    #[cfg(feature = "deploy")]
3391    #[tokio::test]
3392    async fn unbounded_reduce_remembers_state() {
3393        let mut deployment = Deployment::new();
3394
3395        let mut flow = FlowBuilder::new();
3396        let node = flow.process::<()>();
3397        let external = flow.external::<()>();
3398
3399        let (input_port, input) = node.source_external_bincode(&external);
3400        let out = input
3401            .reduce(q!(|acc, v| *acc += v))
3402            .sample_eager(nondet!(/** test */))
3403            .send_bincode_external(&external);
3404
3405        let nodes = flow
3406            .with_process(&node, deployment.Localhost())
3407            .with_external(&external, deployment.Localhost())
3408            .deploy(&mut deployment);
3409
3410        deployment.deploy().await.unwrap();
3411
3412        let mut external_in = nodes.connect(input_port).await;
3413        let mut external_out = nodes.connect(out).await;
3414
3415        deployment.start().await.unwrap();
3416
3417        external_in.send(1).await.unwrap();
3418        assert_eq!(external_out.next().await.unwrap(), 1);
3419
3420        external_in.send(2).await.unwrap();
3421        assert_eq!(external_out.next().await.unwrap(), 3);
3422    }
3423
3424    #[cfg(feature = "deploy")]
3425    #[tokio::test]
3426    async fn top_level_bounded_cross_singleton() {
3427        let mut deployment = Deployment::new();
3428
3429        let mut flow = FlowBuilder::new();
3430        let node = flow.process::<()>();
3431        let external = flow.external::<()>();
3432
3433        let (input_port, input) =
3434            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3435
3436        let out = input
3437            .cross_singleton(
3438                node.source_iter(q!(vec![1, 2, 3]))
3439                    .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3440            )
3441            .send_bincode_external(&external);
3442
3443        let nodes = flow
3444            .with_process(&node, deployment.Localhost())
3445            .with_external(&external, deployment.Localhost())
3446            .deploy(&mut deployment);
3447
3448        deployment.deploy().await.unwrap();
3449
3450        let mut external_in = nodes.connect(input_port).await;
3451        let mut external_out = nodes.connect(out).await;
3452
3453        deployment.start().await.unwrap();
3454
3455        external_in.send(1).await.unwrap();
3456        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3457
3458        external_in.send(2).await.unwrap();
3459        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3460    }
3461
3462    #[cfg(feature = "deploy")]
3463    #[tokio::test]
3464    async fn top_level_bounded_reduce_cardinality() {
3465        let mut deployment = Deployment::new();
3466
3467        let mut flow = FlowBuilder::new();
3468        let node = flow.process::<()>();
3469        let external = flow.external::<()>();
3470
3471        let (input_port, input) =
3472            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3473
3474        let out = sliced! {
3475            let input = use(input, nondet!(/** test */));
3476            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!(/** test */));
3477            input.cross_singleton(v.into_stream().count())
3478        }
3479        .send_bincode_external(&external);
3480
3481        let nodes = flow
3482            .with_process(&node, deployment.Localhost())
3483            .with_external(&external, deployment.Localhost())
3484            .deploy(&mut deployment);
3485
3486        deployment.deploy().await.unwrap();
3487
3488        let mut external_in = nodes.connect(input_port).await;
3489        let mut external_out = nodes.connect(out).await;
3490
3491        deployment.start().await.unwrap();
3492
3493        external_in.send(1).await.unwrap();
3494        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3495
3496        external_in.send(2).await.unwrap();
3497        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3498    }
3499
3500    #[cfg(feature = "deploy")]
3501    #[tokio::test]
3502    async fn top_level_bounded_into_singleton_cardinality() {
3503        let mut deployment = Deployment::new();
3504
3505        let mut flow = FlowBuilder::new();
3506        let node = flow.process::<()>();
3507        let external = flow.external::<()>();
3508
3509        let (input_port, input) =
3510            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3511
3512        let out = sliced! {
3513            let input = use(input, nondet!(/** test */));
3514            let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!(/** test */));
3515            input.cross_singleton(v.into_stream().count())
3516        }
3517        .send_bincode_external(&external);
3518
3519        let nodes = flow
3520            .with_process(&node, deployment.Localhost())
3521            .with_external(&external, deployment.Localhost())
3522            .deploy(&mut deployment);
3523
3524        deployment.deploy().await.unwrap();
3525
3526        let mut external_in = nodes.connect(input_port).await;
3527        let mut external_out = nodes.connect(out).await;
3528
3529        deployment.start().await.unwrap();
3530
3531        external_in.send(1).await.unwrap();
3532        assert_eq!(external_out.next().await.unwrap(), (1, 1));
3533
3534        external_in.send(2).await.unwrap();
3535        assert_eq!(external_out.next().await.unwrap(), (2, 1));
3536    }
3537
3538    #[cfg(feature = "deploy")]
3539    #[tokio::test]
3540    async fn atomic_fold_replays_each_tick() {
3541        let mut deployment = Deployment::new();
3542
3543        let mut flow = FlowBuilder::new();
3544        let node = flow.process::<()>();
3545        let external = flow.external::<()>();
3546
3547        let (input_port, input) =
3548            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3549        let tick = node.tick();
3550
3551        let out = input
3552            .batch(&tick, nondet!(/** test */))
3553            .cross_singleton(
3554                node.source_iter(q!(vec![1, 2, 3]))
3555                    .atomic()
3556                    .fold(q!(|| 0), q!(|acc, v| *acc += v))
3557                    .snapshot_atomic(&tick, nondet!(/** test */)),
3558            )
3559            .all_ticks()
3560            .send_bincode_external(&external);
3561
3562        let nodes = flow
3563            .with_process(&node, deployment.Localhost())
3564            .with_external(&external, deployment.Localhost())
3565            .deploy(&mut deployment);
3566
3567        deployment.deploy().await.unwrap();
3568
3569        let mut external_in = nodes.connect(input_port).await;
3570        let mut external_out = nodes.connect(out).await;
3571
3572        deployment.start().await.unwrap();
3573
3574        external_in.send(1).await.unwrap();
3575        assert_eq!(external_out.next().await.unwrap(), (1, 6));
3576
3577        external_in.send(2).await.unwrap();
3578        assert_eq!(external_out.next().await.unwrap(), (2, 6));
3579    }
3580
3581    #[cfg(feature = "deploy")]
3582    #[tokio::test]
3583    async fn unbounded_scan_remembers_state() {
3584        let mut deployment = Deployment::new();
3585
3586        let mut flow = FlowBuilder::new();
3587        let node = flow.process::<()>();
3588        let external = flow.external::<()>();
3589
3590        let (input_port, input) = node.source_external_bincode(&external);
3591        let out = input
3592            .scan(
3593                q!(|| 0),
3594                q!(|acc, v| {
3595                    *acc += v;
3596                    Some(*acc)
3597                }),
3598            )
3599            .send_bincode_external(&external);
3600
3601        let nodes = flow
3602            .with_process(&node, deployment.Localhost())
3603            .with_external(&external, deployment.Localhost())
3604            .deploy(&mut deployment);
3605
3606        deployment.deploy().await.unwrap();
3607
3608        let mut external_in = nodes.connect(input_port).await;
3609        let mut external_out = nodes.connect(out).await;
3610
3611        deployment.start().await.unwrap();
3612
3613        external_in.send(1).await.unwrap();
3614        assert_eq!(external_out.next().await.unwrap(), 1);
3615
3616        external_in.send(2).await.unwrap();
3617        assert_eq!(external_out.next().await.unwrap(), 3);
3618    }
3619
3620    #[cfg(feature = "deploy")]
3621    #[tokio::test]
3622    async fn unbounded_enumerate_remembers_state() {
3623        let mut deployment = Deployment::new();
3624
3625        let mut flow = FlowBuilder::new();
3626        let node = flow.process::<()>();
3627        let external = flow.external::<()>();
3628
3629        let (input_port, input) = node.source_external_bincode(&external);
3630        let out = input.enumerate().send_bincode_external(&external);
3631
3632        let nodes = flow
3633            .with_process(&node, deployment.Localhost())
3634            .with_external(&external, deployment.Localhost())
3635            .deploy(&mut deployment);
3636
3637        deployment.deploy().await.unwrap();
3638
3639        let mut external_in = nodes.connect(input_port).await;
3640        let mut external_out = nodes.connect(out).await;
3641
3642        deployment.start().await.unwrap();
3643
3644        external_in.send(1).await.unwrap();
3645        assert_eq!(external_out.next().await.unwrap(), (0, 1));
3646
3647        external_in.send(2).await.unwrap();
3648        assert_eq!(external_out.next().await.unwrap(), (1, 2));
3649    }
3650
3651    #[cfg(feature = "deploy")]
3652    #[tokio::test]
3653    async fn unbounded_unique_remembers_state() {
3654        let mut deployment = Deployment::new();
3655
3656        let mut flow = FlowBuilder::new();
3657        let node = flow.process::<()>();
3658        let external = flow.external::<()>();
3659
3660        let (input_port, input) =
3661            node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3662        let out = input.unique().send_bincode_external(&external);
3663
3664        let nodes = flow
3665            .with_process(&node, deployment.Localhost())
3666            .with_external(&external, deployment.Localhost())
3667            .deploy(&mut deployment);
3668
3669        deployment.deploy().await.unwrap();
3670
3671        let mut external_in = nodes.connect(input_port).await;
3672        let mut external_out = nodes.connect(out).await;
3673
3674        deployment.start().await.unwrap();
3675
3676        external_in.send(1).await.unwrap();
3677        assert_eq!(external_out.next().await.unwrap(), 1);
3678
3679        external_in.send(2).await.unwrap();
3680        assert_eq!(external_out.next().await.unwrap(), 2);
3681
3682        external_in.send(1).await.unwrap();
3683        external_in.send(3).await.unwrap();
3684        assert_eq!(external_out.next().await.unwrap(), 3);
3685    }
3686
3687    #[cfg(feature = "sim")]
3688    #[test]
3689    #[should_panic]
3690    fn sim_batch_nondet_size() {
3691        let mut flow = FlowBuilder::new();
3692        let node = flow.process::<()>();
3693
3694        let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3695
3696        let tick = node.tick();
3697        let out_recv = input
3698            .batch(&tick, nondet!(/** test */))
3699            .count()
3700            .all_ticks()
3701            .sim_output();
3702
3703        flow.sim().exhaustive(async || {
3704            in_send.send(());
3705            in_send.send(());
3706            in_send.send(());
3707
3708            assert_eq!(out_recv.next().await.unwrap(), 3); // fails with nondet batching
3709        });
3710    }
3711
3712    #[cfg(feature = "sim")]
3713    #[test]
3714    fn sim_batch_preserves_order() {
3715        let mut flow = FlowBuilder::new();
3716        let node = flow.process::<()>();
3717
3718        let (in_send, input) = node.sim_input();
3719
3720        let tick = node.tick();
3721        let out_recv = input
3722            .batch(&tick, nondet!(/** test */))
3723            .all_ticks()
3724            .sim_output();
3725
3726        flow.sim().exhaustive(async || {
3727            in_send.send(1);
3728            in_send.send(2);
3729            in_send.send(3);
3730
3731            out_recv.assert_yields_only([1, 2, 3]).await;
3732        });
3733    }
3734
3735    #[cfg(feature = "sim")]
3736    #[test]
3737    #[should_panic]
3738    fn sim_batch_unordered_shuffles() {
3739        let mut flow = FlowBuilder::new();
3740        let node = flow.process::<()>();
3741
3742        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3743
3744        let tick = node.tick();
3745        let batch = input.batch(&tick, nondet!(/** test */));
3746        let out_recv = batch
3747            .clone()
3748            .min()
3749            .zip(batch.max())
3750            .all_ticks()
3751            .sim_output();
3752
3753        flow.sim().exhaustive(async || {
3754            in_send.send_many_unordered([1, 2, 3]);
3755
3756            if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3757                panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3758            }
3759        });
3760    }
3761
3762    #[cfg(feature = "sim")]
3763    #[test]
3764    fn sim_batch_unordered_shuffles_count() {
3765        let mut flow = FlowBuilder::new();
3766        let node = flow.process::<()>();
3767
3768        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3769
3770        let tick = node.tick();
3771        let batch = input.batch(&tick, nondet!(/** test */));
3772        let out_recv = batch.all_ticks().sim_output();
3773
3774        let instance_count = flow.sim().exhaustive(async || {
3775            in_send.send_many_unordered([1, 2, 3, 4]);
3776            out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3777        });
3778
3779        assert_eq!(
3780            instance_count,
3781            75 // ∑ (k=1 to 4) S(4,k) × k! = 75
3782        )
3783    }
3784
3785    #[cfg(feature = "sim")]
3786    #[test]
3787    #[should_panic]
3788    fn sim_observe_order_batched() {
3789        let mut flow = FlowBuilder::new();
3790        let node = flow.process::<()>();
3791
3792        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3793
3794        let tick = node.tick();
3795        let batch = input.batch(&tick, nondet!(/** test */));
3796        let out_recv = batch
3797            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3798            .all_ticks()
3799            .sim_output();
3800
3801        flow.sim().exhaustive(async || {
3802            in_send.send_many_unordered([1, 2, 3, 4]);
3803            out_recv.assert_yields_only([1, 2, 3, 4]).await; // fails with assume_ordering
3804        });
3805    }
3806
3807    #[cfg(feature = "sim")]
3808    #[test]
3809    fn sim_observe_order_batched_count() {
3810        let mut flow = FlowBuilder::new();
3811        let node = flow.process::<()>();
3812
3813        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3814
3815        let tick = node.tick();
3816        let batch = input.batch(&tick, nondet!(/** test */));
3817        let out_recv = batch
3818            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3819            .all_ticks()
3820            .sim_output();
3821
3822        let instance_count = flow.sim().exhaustive(async || {
3823            in_send.send_many_unordered([1, 2, 3, 4]);
3824            let _ = out_recv.collect::<Vec<_>>().await;
3825        });
3826
3827        assert_eq!(
3828            instance_count,
3829            192 // 4! * 2^{4 - 1}
3830        )
3831    }
3832
3833    #[cfg(feature = "sim")]
3834    #[test]
3835    fn sim_unordered_count_instance_count() {
3836        let mut flow = FlowBuilder::new();
3837        let node = flow.process::<()>();
3838
3839        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3840
3841        let tick = node.tick();
3842        let out_recv = input
3843            .count()
3844            .snapshot(&tick, nondet!(/** test */))
3845            .all_ticks()
3846            .sim_output();
3847
3848        let instance_count = flow.sim().exhaustive(async || {
3849            in_send.send_many_unordered([1, 2, 3, 4]);
3850            assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3851        });
3852
3853        assert_eq!(
3854            instance_count,
3855            16 // 2^4, { 0, 1, 2, 3 } can be a snapshot and 4 is always included
3856        )
3857    }
3858
3859    #[cfg(feature = "sim")]
3860    #[test]
3861    fn sim_top_level_assume_ordering() {
3862        let mut flow = FlowBuilder::new();
3863        let node = flow.process::<()>();
3864
3865        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3866
3867        let out_recv = input
3868            .assume_ordering::<TotalOrder>(nondet!(/** test */))
3869            .sim_output();
3870
3871        let instance_count = flow.sim().exhaustive(async || {
3872            in_send.send_many_unordered([1, 2, 3]);
3873            let mut out = out_recv.collect::<Vec<_>>().await;
3874            out.sort();
3875            assert_eq!(out, vec![1, 2, 3]);
3876        });
3877
3878        assert_eq!(instance_count, 6)
3879    }
3880
3881    #[cfg(feature = "sim")]
3882    #[test]
3883    fn sim_top_level_assume_ordering_cycle_back() {
3884        let mut flow = FlowBuilder::new();
3885        let node = flow.process::<()>();
3886        let node2 = flow.process::<()>();
3887
3888        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3889
3890        let (complete_cycle_back, cycle_back) =
3891            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3892        let ordered = input
3893            .merge_unordered(cycle_back)
3894            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3895        complete_cycle_back.complete(
3896            ordered
3897                .clone()
3898                .map(q!(|v| v + 1))
3899                .filter(q!(|v| v % 2 == 1))
3900                .send(&node2, TCP.fail_stop().bincode())
3901                .send(&node, TCP.fail_stop().bincode()),
3902        );
3903
3904        let out_recv = ordered.sim_output();
3905
3906        let mut saw = false;
3907        let instance_count = flow.sim().exhaustive(async || {
3908            in_send.send_many_unordered([0, 2]);
3909            let out = out_recv.collect::<Vec<_>>().await;
3910
3911            if out.starts_with(&[0, 1, 2]) {
3912                saw = true;
3913            }
3914        });
3915
3916        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3917        assert_eq!(instance_count, 6);
3918    }
3919
3920    #[cfg(feature = "sim")]
3921    #[test]
3922    fn sim_top_level_assume_ordering_cycle_back_tick() {
3923        let mut flow = FlowBuilder::new();
3924        let node = flow.process::<()>();
3925        let node2 = flow.process::<()>();
3926
3927        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3928
3929        let (complete_cycle_back, cycle_back) =
3930            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3931        let ordered = input
3932            .merge_unordered(cycle_back)
3933            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3934        complete_cycle_back.complete(
3935            ordered
3936                .clone()
3937                .batch(&node.tick(), nondet!(/** test */))
3938                .all_ticks()
3939                .map(q!(|v| v + 1))
3940                .filter(q!(|v| v % 2 == 1))
3941                .send(&node2, TCP.fail_stop().bincode())
3942                .send(&node, TCP.fail_stop().bincode()),
3943        );
3944
3945        let out_recv = ordered.sim_output();
3946
3947        let mut saw = false;
3948        let instance_count = flow.sim().exhaustive(async || {
3949            in_send.send_many_unordered([0, 2]);
3950            let out = out_recv.collect::<Vec<_>>().await;
3951
3952            if out.starts_with(&[0, 1, 2]) {
3953                saw = true;
3954            }
3955        });
3956
3957        assert!(saw, "did not see an instance with 0, 1, 2 in order");
3958        assert_eq!(instance_count, 58);
3959    }
3960
3961    #[cfg(feature = "sim")]
3962    #[test]
3963    fn sim_top_level_assume_ordering_multiple() {
3964        let mut flow = FlowBuilder::new();
3965        let node = flow.process::<()>();
3966        let node2 = flow.process::<()>();
3967
3968        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3969        let (_, input2) = node.sim_input::<_, NoOrder, _>();
3970
3971        let (complete_cycle_back, cycle_back) =
3972            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3973        let input1_ordered = input
3974            .clone()
3975            .merge_unordered(cycle_back)
3976            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3977        let foo = input1_ordered
3978            .clone()
3979            .map(q!(|v| v + 3))
3980            .weaken_ordering::<NoOrder>()
3981            .merge_unordered(input2)
3982            .assume_ordering::<TotalOrder>(nondet!(/** test */));
3983
3984        complete_cycle_back.complete(
3985            foo.filter(q!(|v| *v == 3))
3986                .send(&node2, TCP.fail_stop().bincode())
3987                .send(&node, TCP.fail_stop().bincode()),
3988        );
3989
3990        let out_recv = input1_ordered.sim_output();
3991
3992        let mut saw = false;
3993        let instance_count = flow.sim().exhaustive(async || {
3994            in_send.send_many_unordered([0, 1]);
3995            let out = out_recv.collect::<Vec<_>>().await;
3996
3997            if out.starts_with(&[0, 3, 1]) {
3998                saw = true;
3999            }
4000        });
4001
4002        assert!(saw, "did not see an instance with 0, 3, 1 in order");
4003        assert_eq!(instance_count, 24);
4004    }
4005
4006    #[cfg(feature = "sim")]
4007    #[test]
4008    fn sim_atomic_assume_ordering_cycle_back() {
4009        let mut flow = FlowBuilder::new();
4010        let node = flow.process::<()>();
4011        let node2 = flow.process::<()>();
4012
4013        let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4014
4015        let (complete_cycle_back, cycle_back) =
4016            node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4017        let ordered = input
4018            .merge_unordered(cycle_back)
4019            .atomic()
4020            .assume_ordering::<TotalOrder>(nondet!(/** test */))
4021            .end_atomic();
4022        complete_cycle_back.complete(
4023            ordered
4024                .clone()
4025                .map(q!(|v| v + 1))
4026                .filter(q!(|v| v % 2 == 1))
4027                .send(&node2, TCP.fail_stop().bincode())
4028                .send(&node, TCP.fail_stop().bincode()),
4029        );
4030
4031        let out_recv = ordered.sim_output();
4032
4033        let instance_count = flow.sim().exhaustive(async || {
4034            in_send.send_many_unordered([0, 2]);
4035            let out = out_recv.collect::<Vec<_>>().await;
4036            assert_eq!(out.len(), 4);
4037        });
4038        assert_eq!(instance_count, 22);
4039    }
4040
4041    #[cfg(feature = "deploy")]
4042    #[tokio::test]
4043    async fn partition_evens_odds() {
4044        let mut deployment = Deployment::new();
4045
4046        let mut flow = FlowBuilder::new();
4047        let node = flow.process::<()>();
4048        let external = flow.external::<()>();
4049
4050        let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4051        let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4052        let evens_port = evens.send_bincode_external(&external);
4053        let odds_port = odds.send_bincode_external(&external);
4054
4055        let nodes = flow
4056            .with_process(&node, deployment.Localhost())
4057            .with_external(&external, deployment.Localhost())
4058            .deploy(&mut deployment);
4059
4060        deployment.deploy().await.unwrap();
4061
4062        let mut evens_out = nodes.connect(evens_port).await;
4063        let mut odds_out = nodes.connect(odds_port).await;
4064
4065        deployment.start().await.unwrap();
4066
4067        let mut even_results = Vec::new();
4068        for _ in 0..3 {
4069            even_results.push(evens_out.next().await.unwrap());
4070        }
4071        even_results.sort();
4072        assert_eq!(even_results, vec![2, 4, 6]);
4073
4074        let mut odd_results = Vec::new();
4075        for _ in 0..3 {
4076            odd_results.push(odds_out.next().await.unwrap());
4077        }
4078        odd_results.sort();
4079        assert_eq!(odd_results, vec![1, 3, 5]);
4080    }
4081
4082    #[cfg(feature = "deploy")]
4083    #[tokio::test]
4084    async fn unconsumed_inspect_still_runs() {
4085        use crate::deploy::DeployCrateWrapper;
4086
4087        let mut deployment = Deployment::new();
4088
4089        let mut flow = FlowBuilder::new();
4090        let node = flow.process::<()>();
4091
4092        // The return value of .inspect() is intentionally dropped.
4093        // Before the Null-root fix, this would silently do nothing.
4094        node.source_iter(q!(0..5))
4095            .inspect(q!(|x| println!("inspect: {}", x)));
4096
4097        let nodes = flow
4098            .with_process(&node, deployment.Localhost())
4099            .deploy(&mut deployment);
4100
4101        deployment.deploy().await.unwrap();
4102
4103        let mut stdout = nodes.get_process(&node).stdout();
4104
4105        deployment.start().await.unwrap();
4106
4107        let mut lines = Vec::new();
4108        for _ in 0..5 {
4109            lines.push(stdout.recv().await.unwrap());
4110        }
4111        lines.sort();
4112        assert_eq!(
4113            lines,
4114            vec![
4115                "inspect: 0",
4116                "inspect: 1",
4117                "inspect: 2",
4118                "inspect: 3",
4119                "inspect: 4",
4120            ]
4121        );
4122    }
4123
4124    #[cfg(feature = "sim")]
4125    #[test]
4126    fn sim_limit() {
4127        let mut flow = FlowBuilder::new();
4128        let node = flow.process::<()>();
4129
4130        let (in_send, input) = node.sim_input();
4131
4132        let out_recv = input.limit(q!(3)).sim_output();
4133
4134        flow.sim().exhaustive(async || {
4135            in_send.send(1);
4136            in_send.send(2);
4137            in_send.send(3);
4138            in_send.send(4);
4139            in_send.send(5);
4140
4141            out_recv.assert_yields_only([1, 2, 3]).await;
4142        });
4143    }
4144
4145    #[cfg(feature = "sim")]
4146    #[test]
4147    fn sim_limit_zero() {
4148        let mut flow = FlowBuilder::new();
4149        let node = flow.process::<()>();
4150
4151        let (in_send, input) = node.sim_input();
4152
4153        let out_recv = input.limit(q!(0)).sim_output();
4154
4155        flow.sim().exhaustive(async || {
4156            in_send.send(1);
4157            in_send.send(2);
4158
4159            out_recv.assert_yields_only::<i32, _>([]).await;
4160        });
4161    }
4162
4163    #[cfg(feature = "sim")]
4164    #[test]
4165    fn sim_merge_ordered() {
4166        let mut flow = FlowBuilder::new();
4167        let node = flow.process::<()>();
4168
4169        let (in_send, input) = node.sim_input();
4170        let (in_send2, input2) = node.sim_input();
4171
4172        let out_recv = input
4173            .merge_ordered(input2, nondet!(/** test */))
4174            .sim_output();
4175
4176        let mut saw_out_of_order = false;
4177        let instances = flow.sim().exhaustive(async || {
4178            in_send.send(1);
4179            in_send.send(2);
4180            in_send2.send(3);
4181            in_send2.send(4);
4182
4183            let out = out_recv.collect::<Vec<_>>().await;
4184
4185            if out == [1, 3, 2, 4] {
4186                saw_out_of_order = true;
4187            }
4188
4189            // Assert ordering preservation: elements from each input must
4190            // appear in their original relative order.
4191            let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4192            let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4193            assert_eq!(
4194                first_elements,
4195                vec![1, 2],
4196                "first input order violated: {:?}",
4197                out
4198            );
4199            assert_eq!(
4200                second_elements,
4201                vec![3, 4],
4202                "second input order violated: {:?}",
4203                out
4204            );
4205
4206            first_elements.append(&mut second_elements);
4207            first_elements.sort();
4208            assert_eq!(first_elements, vec![1, 2, 3, 4]);
4209        });
4210
4211        assert!(saw_out_of_order);
4212        assert_eq!(instances, 6);
4213    }
4214
4215    /// Tests that merge_ordered passes through elements when only one input
4216    /// has data.
4217    #[cfg(feature = "sim")]
4218    #[test]
4219    fn sim_merge_ordered_one_empty() {
4220        let mut flow = FlowBuilder::new();
4221        let node = flow.process::<()>();
4222
4223        let (in_send, input) = node.sim_input();
4224        let (_in_send2, input2) = node.sim_input();
4225
4226        let out_recv = input
4227            .merge_ordered(input2, nondet!(/** test */))
4228            .sim_output();
4229
4230        let instances = flow.sim().exhaustive(async || {
4231            in_send.send(1);
4232            in_send.send(2);
4233
4234            let out = out_recv.collect::<Vec<_>>().await;
4235            assert_eq!(out, vec![1, 2]);
4236        });
4237
4238        // Only one possible interleaving when one input is empty
4239        assert_eq!(instances, 1);
4240    }
4241
4242    /// Tests that merge_ordered correctly handles feedback cycles.
4243    /// An element output from merge_ordered is filtered and cycled back to
4244    /// one of its inputs. The one-at-a-time release must allow the cycled-back
4245    /// element to arrive and potentially be emitted before elements still
4246    /// waiting on the other input.
4247    #[cfg(feature = "sim")]
4248    #[test]
4249    fn sim_merge_ordered_cycle_back() {
4250        let mut flow = FlowBuilder::new();
4251        let node = flow.process::<()>();
4252
4253        let (in_send, input) = node.sim_input();
4254
4255        // Create a forward ref for the cycle back
4256        let (complete_cycle_back, cycle_back) =
4257            node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4258
4259        // merge_ordered: input (external) with cycle_back
4260        let merged = input.merge_ordered(cycle_back, nondet!(/** test */));
4261
4262        // Cycle back: elements equal to 1 get mapped to 10 and fed back
4263        complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4264
4265        let out_recv = merged.sim_output();
4266
4267        // Send 1 and 2. Element 1 should cycle back as 10.
4268        // Valid orderings must have 1 before 10 (since 10 depends on 1).
4269        let mut saw_cycle_before_second = false;
4270        flow.sim().exhaustive(async || {
4271            in_send.send(1);
4272            in_send.send(2);
4273
4274            let out = out_recv.collect::<Vec<_>>().await;
4275
4276            // 10 must always come after 1 (causal dependency)
4277            let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4278            let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4279            assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4280
4281            // Check if we see [1, 10, 2] — the cycled element beats the second input
4282            if out == [1, 10, 2] {
4283                saw_cycle_before_second = true;
4284            }
4285
4286            let mut sorted = out;
4287            sorted.sort();
4288            assert_eq!(sorted, vec![1, 2, 10]);
4289        });
4290
4291        assert!(
4292            saw_cycle_before_second,
4293            "never saw the cycled element arrive before the second input element"
4294        );
4295    }
4296
4297    /// Tests that merge_ordered correctly interleaves when one input has a
4298    /// delayed element. With a: [1, _delay_, 2] and b: [3, 4], the delayed
4299    /// element 2 should be able to appear after b's elements.
4300    #[cfg(feature = "sim")]
4301    #[test]
4302    fn sim_merge_ordered_delayed() {
4303        let mut flow = FlowBuilder::new();
4304        let node = flow.process::<()>();
4305
4306        let (in_send, input) = node.sim_input();
4307        let (in_send2, input2) = node.sim_input();
4308
4309        let out_recv = input
4310            .merge_ordered(input2, nondet!(/** test */))
4311            .sim_output();
4312
4313        let mut saw_delayed_interleaving = false;
4314        flow.sim().exhaustive(async || {
4315            // Send 1 from a, and 3, 4 from b
4316            in_send.send(1);
4317            in_send2.send(3);
4318            in_send2.send(4);
4319
4320            // Collect what's available so far
4321            let first_batch = out_recv.collect::<Vec<_>>().await;
4322
4323            // Now send the delayed element 2 from a
4324            in_send.send(2);
4325            let second_batch = out_recv.collect::<Vec<_>>().await;
4326
4327            let mut all: Vec<_> = first_batch
4328                .iter()
4329                .chain(second_batch.iter())
4330                .copied()
4331                .collect();
4332
4333            // Check if we saw [1, 3, 4, 2] — the delayed interleaving
4334            if all == [1, 3, 4, 2] {
4335                saw_delayed_interleaving = true;
4336            }
4337
4338            all.sort();
4339            assert_eq!(all, vec![1, 2, 3, 4]);
4340        });
4341
4342        assert!(saw_delayed_interleaving);
4343    }
4344
4345    /// Deploy test: merge_ordered with a delayed element on one input.
4346    /// Sends a=1, b=3, b=4, then after receiving those, sends a=2.
4347    /// Expects to see [1, 3, 4] first, then [2] — demonstrating that
4348    /// both inputs are pulled and the delayed element arrives later.
4349    #[cfg(feature = "deploy")]
4350    #[tokio::test]
4351    async fn deploy_merge_ordered_delayed() {
4352        let mut deployment = Deployment::new();
4353
4354        let mut flow = FlowBuilder::new();
4355        let node = flow.process::<()>();
4356        let external = flow.external::<()>();
4357
4358        let (input_a_port, input_a) = node.source_external_bincode(&external);
4359        let (input_b_port, input_b) = node.source_external_bincode(&external);
4360
4361        let out = input_a
4362            .assume_ordering(nondet!(/** test */))
4363            .merge_ordered(
4364                input_b.assume_ordering(nondet!(/** test */)),
4365                nondet!(/** test */),
4366            )
4367            .send_bincode_external(&external);
4368
4369        let nodes = flow
4370            .with_process(&node, deployment.Localhost())
4371            .with_external(&external, deployment.Localhost())
4372            .deploy(&mut deployment);
4373
4374        deployment.deploy().await.unwrap();
4375
4376        let mut ext_a = nodes.connect(input_a_port).await;
4377        let mut ext_b = nodes.connect(input_b_port).await;
4378        let mut ext_out = nodes.connect(out).await;
4379
4380        deployment.start().await.unwrap();
4381
4382        // Send a=1, b=3, b=4
4383        ext_a.send(1).await.unwrap();
4384        ext_b.send(3).await.unwrap();
4385        ext_b.send(4).await.unwrap();
4386
4387        // Collect the first 3 elements
4388        let mut received = Vec::new();
4389        for _ in 0..3 {
4390            received.push(ext_out.next().await.unwrap());
4391        }
4392
4393        // Now send the delayed a=2
4394        ext_a.send(2).await.unwrap();
4395        received.push(ext_out.next().await.unwrap());
4396
4397        // All elements should be present
4398        received.sort();
4399        assert_eq!(received, vec![1, 2, 3, 4]);
4400    }
4401
4402    #[cfg(feature = "deploy")]
4403    #[tokio::test]
4404    async fn monotone_fold_threshold() {
4405        use crate::properties::manual_proof;
4406
4407        let mut deployment = Deployment::new();
4408
4409        let mut flow = FlowBuilder::new();
4410        let node = flow.process::<()>();
4411        let external = flow.external::<()>();
4412
4413        let in_unbounded: super::Stream<_, _> =
4414            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4415        let sum = in_unbounded.fold(
4416            q!(|| 0),
4417            q!(
4418                |sum, v| {
4419                    *sum += v;
4420                },
4421                monotone = manual_proof!(/** test */)
4422            ),
4423        );
4424
4425        let threshold_out = sum
4426            .threshold_greater_or_equal(node.singleton(q!(7)))
4427            .send_bincode_external(&external);
4428
4429        let nodes = flow
4430            .with_process(&node, deployment.Localhost())
4431            .with_external(&external, deployment.Localhost())
4432            .deploy(&mut deployment);
4433
4434        deployment.deploy().await.unwrap();
4435
4436        let mut threshold_out = nodes.connect(threshold_out).await;
4437
4438        deployment.start().await.unwrap();
4439
4440        assert_eq!(threshold_out.next().await.unwrap(), 7);
4441    }
4442
4443    #[cfg(feature = "deploy")]
4444    #[tokio::test]
4445    async fn monotone_count_threshold() {
4446        let mut deployment = Deployment::new();
4447
4448        let mut flow = FlowBuilder::new();
4449        let node = flow.process::<()>();
4450        let external = flow.external::<()>();
4451
4452        let in_unbounded: super::Stream<_, _> =
4453            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4454        let sum = in_unbounded.count();
4455
4456        let threshold_out = sum
4457            .threshold_greater_or_equal(node.singleton(q!(3)))
4458            .send_bincode_external(&external);
4459
4460        let nodes = flow
4461            .with_process(&node, deployment.Localhost())
4462            .with_external(&external, deployment.Localhost())
4463            .deploy(&mut deployment);
4464
4465        deployment.deploy().await.unwrap();
4466
4467        let mut threshold_out = nodes.connect(threshold_out).await;
4468
4469        deployment.start().await.unwrap();
4470
4471        assert_eq!(threshold_out.next().await.unwrap(), 3);
4472    }
4473
4474    #[cfg(feature = "deploy")]
4475    #[tokio::test]
4476    async fn monotone_map_order_preserving_threshold() {
4477        use crate::properties::manual_proof;
4478
4479        let mut deployment = Deployment::new();
4480
4481        let mut flow = FlowBuilder::new();
4482        let node = flow.process::<()>();
4483        let external = flow.external::<()>();
4484
4485        let in_unbounded: super::Stream<_, _> =
4486            node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4487        let sum = in_unbounded.fold(
4488            q!(|| 0),
4489            q!(
4490                |sum, v| {
4491                    *sum += v;
4492                },
4493                monotone = manual_proof!(/** test */)
4494            ),
4495        );
4496
4497        // map with order_preserving should preserve monotonicity
4498        let doubled = sum.map(q!(
4499            |v| v * 2,
4500            order_preserving = manual_proof!(/** doubling preserves order */)
4501        ));
4502
4503        let threshold_out = doubled
4504            .threshold_greater_or_equal(node.singleton(q!(14)))
4505            .send_bincode_external(&external);
4506
4507        let nodes = flow
4508            .with_process(&node, deployment.Localhost())
4509            .with_external(&external, deployment.Localhost())
4510            .deploy(&mut deployment);
4511
4512        deployment.deploy().await.unwrap();
4513
4514        let mut threshold_out = nodes.connect(threshold_out).await;
4515
4516        deployment.start().await.unwrap();
4517
4518        assert_eq!(threshold_out.next().await.unwrap(), 14);
4519    }
4520
4521    // === Compile-time type tests for join/cross_product ordering ===
4522
4523    #[cfg(any(feature = "deploy", feature = "sim"))]
4524    mod join_ordering_type_tests {
4525        use crate::live_collections::boundedness::{Bounded, Unbounded};
4526        use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4527        use crate::location::{Location, Process};
4528
4529        #[expect(dead_code, reason = "compile-time type test")]
4530        fn join_unbounded_with_bounded_preserves_order<'a>(
4531            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4532            right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4533        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4534            left.join(right)
4535        }
4536
4537        #[expect(dead_code, reason = "compile-time type test")]
4538        fn join_unbounded_with_unbounded_is_no_order<'a>(
4539            left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4540            right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4541        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4542            left.join(right)
4543        }
4544
4545        #[expect(dead_code, reason = "compile-time type test")]
4546        fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4547            left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4548            right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4549        ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4550            left.join(right)
4551        }
4552
4553        #[expect(dead_code, reason = "compile-time type test")]
4554        fn join_unbounded_noorder_with_bounded<'a>(
4555            left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4556            right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4557        ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4558            left.join(right)
4559        }
4560
4561        // === Compile-time type tests for cross_product ordering ===
4562
4563        #[expect(dead_code, reason = "compile-time type test")]
4564        fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4565            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4566            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4567        ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4568            left.cross_product(right)
4569        }
4570
4571        #[expect(dead_code, reason = "compile-time type test")]
4572        fn cross_product_bounded_with_bounded_preserves_order<'a>(
4573            left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4574            right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4575        ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4576            left.cross_product(right)
4577        }
4578
4579        #[expect(dead_code, reason = "compile-time type test")]
4580        fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4581            left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4582            right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4583        ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4584            left.cross_product(right)
4585        }
4586    } // mod join_ordering_type_tests
4587
4588    // === Runtime correctness tests for bounded join/cross_product ===
4589
4590    #[cfg(feature = "sim")]
4591    #[test]
4592    fn cross_product_mixed_boundedness_correctness() {
4593        use stageleft::q;
4594
4595        use crate::compile::builder::FlowBuilder;
4596        use crate::nondet::nondet;
4597
4598        let mut flow = FlowBuilder::new();
4599        let process = flow.process::<()>();
4600        let tick = process.tick();
4601
4602        let left = process.source_iter(q!(vec![1, 2]));
4603        let right = process
4604            .source_iter(q!(vec!['a', 'b']))
4605            .batch(&tick, nondet!(/** test */))
4606            .all_ticks();
4607
4608        let out = left.cross_product(right).sim_output();
4609
4610        flow.sim().exhaustive(async || {
4611            out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4612                .await;
4613        });
4614    }
4615
4616    #[cfg(feature = "sim")]
4617    #[test]
4618    fn join_mixed_boundedness_correctness() {
4619        use stageleft::q;
4620
4621        use crate::compile::builder::FlowBuilder;
4622        use crate::nondet::nondet;
4623
4624        let mut flow = FlowBuilder::new();
4625        let process = flow.process::<()>();
4626        let tick = process.tick();
4627
4628        let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4629        let right = process
4630            .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4631            .batch(&tick, nondet!(/** test */))
4632            .all_ticks();
4633
4634        let out = left.join(right).sim_output();
4635
4636        flow.sim().exhaustive(async || {
4637            out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4638                .await;
4639        });
4640    }
4641
4642    #[cfg(feature = "sim")]
4643    #[test]
4644    fn sim_merge_unordered_independent_atomics() {
4645        let mut flow = FlowBuilder::new();
4646        let node = flow.process::<()>();
4647
4648        let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4649        let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4650
4651        let out = input1
4652            .atomic()
4653            .merge_unordered(input2.atomic())
4654            .end_atomic()
4655            .sim_output();
4656
4657        flow.sim().exhaustive(async || {
4658            in1_send.send(1);
4659            in2_send.send(2);
4660
4661            out.assert_yields_only_unordered(vec![1, 2]).await;
4662        });
4663    }
4664
4665    #[cfg(feature = "deploy")]
4666    #[tokio::test]
4667    async fn test_stream_ref() {
4668        let mut deployment = Deployment::new();
4669
4670        let mut flow = FlowBuilder::new();
4671        let external = flow.external::<()>();
4672        let p1 = flow.process::<()>();
4673
4674        // Create a bounded stream (source_iter is bounded within a tick)
4675        let my_stream = p1.source_iter(q!(1..=5i32));
4676
4677        let stream_ref = my_stream.by_ref();
4678
4679        // Use the stream ref to get the vec's length
4680        let out_port = p1
4681            .source_iter(q!([()]))
4682            .map(q!(|_| stream_ref.len() as i32))
4683            .send_bincode_external(&external);
4684
4685        // Also consume the stream via pipe
4686        my_stream.for_each(q!(|_| {}));
4687
4688        let nodes = flow
4689            .with_default_optimize()
4690            .with_process(&p1, deployment.Localhost())
4691            .with_external(&external, deployment.Localhost())
4692            .deploy(&mut deployment);
4693
4694        deployment.deploy().await.unwrap();
4695
4696        let mut out_recv = nodes.connect(out_port).await;
4697
4698        deployment.start().await.unwrap();
4699
4700        let result = out_recv.next().await.unwrap();
4701        // stream has 5 elements
4702        assert_eq!(result, 5);
4703    }
4704
4705    #[cfg(feature = "deploy")]
4706    #[tokio::test]
4707    async fn test_stream_ref_contents() {
4708        let mut deployment = Deployment::new();
4709
4710        let mut flow = FlowBuilder::new();
4711        let external = flow.external::<()>();
4712        let p1 = flow.process::<()>();
4713
4714        // Create a bounded stream
4715        let my_stream = p1.source_iter(q!(1..=3i32));
4716
4717        let stream_ref = my_stream.by_ref();
4718
4719        // Sum the referenced vec's contents
4720        let out_port = p1
4721            .source_iter(q!([()]))
4722            .map(q!(|_| stream_ref.iter().sum::<i32>()))
4723            .send_bincode_external(&external);
4724
4725        my_stream.for_each(q!(|_| {}));
4726
4727        let nodes = flow
4728            .with_default_optimize()
4729            .with_process(&p1, deployment.Localhost())
4730            .with_external(&external, deployment.Localhost())
4731            .deploy(&mut deployment);
4732
4733        deployment.deploy().await.unwrap();
4734
4735        let mut out_recv = nodes.connect(out_port).await;
4736
4737        deployment.start().await.unwrap();
4738
4739        let result = out_recv.next().await.unwrap();
4740        // sum of 1+2+3 = 6
4741        assert_eq!(result, 6);
4742    }
4743
4744    #[cfg(feature = "deploy")]
4745    #[tokio::test]
4746    async fn test_stream_ref_no_consumer() {
4747        let mut deployment = Deployment::new();
4748
4749        let mut flow = FlowBuilder::new();
4750        let external = flow.external::<()>();
4751        let p1 = flow.process::<()>();
4752
4753        // Create a bounded stream — no pipe consumer, only ref
4754        let my_stream = p1.source_iter(q!(1..=4i32));
4755
4756        let stream_ref = my_stream.by_ref();
4757
4758        let out_port = p1
4759            .source_iter(q!([()]))
4760            .map(q!(|_| stream_ref.len() as i32))
4761            .send_bincode_external(&external);
4762
4763        let nodes = flow
4764            .with_default_optimize()
4765            .with_process(&p1, deployment.Localhost())
4766            .with_external(&external, deployment.Localhost())
4767            .deploy(&mut deployment);
4768
4769        deployment.deploy().await.unwrap();
4770
4771        let mut out_recv = nodes.connect(out_port).await;
4772
4773        deployment.start().await.unwrap();
4774
4775        let result = out_recv.next().await.unwrap();
4776        assert_eq!(result, 4);
4777    }
4778
4779    #[cfg(feature = "deploy")]
4780    #[tokio::test]
4781    async fn test_stream_mut() {
4782        let mut deployment = Deployment::new();
4783
4784        let mut flow = FlowBuilder::new();
4785        let external = flow.external::<()>();
4786        let p1 = flow.process::<()>();
4787
4788        // Create a bounded stream
4789        let my_stream = p1.source_iter(q!(1..=5i32));
4790
4791        let stream_mut = my_stream.by_mut();
4792
4793        // Mutably reference the buffer to retain only items > 3
4794        let out_port = p1
4795            .source_iter(q!([()]))
4796            .map(q!(|_| {
4797                stream_mut.retain(|x| *x > 3);
4798                stream_mut.len() as i32
4799            }))
4800            .send_bincode_external(&external);
4801
4802        my_stream.for_each(q!(|_| {}));
4803
4804        let nodes = flow
4805            .with_default_optimize()
4806            .with_process(&p1, deployment.Localhost())
4807            .with_external(&external, deployment.Localhost())
4808            .deploy(&mut deployment);
4809
4810        deployment.deploy().await.unwrap();
4811
4812        let mut out_recv = nodes.connect(out_port).await;
4813
4814        deployment.start().await.unwrap();
4815
4816        let result = out_recv.next().await.unwrap();
4817        // After retain(> 3): [4, 5] => len = 2
4818        assert_eq!(result, 2);
4819    }
4820
4821    /// A map with a mut singleton ref on an unordered input should produce > 1
4822    /// simulation instance because the ordering of elements through the mut closure
4823    /// is non-deterministic.
4824    #[cfg(feature = "sim")]
4825    #[test]
4826    fn sim_map_with_mut_on_unordered_explores_multiple_states() {
4827        use crate::live_collections::sliced::sliced;
4828        use crate::live_collections::stream::ExactlyOnce;
4829        use crate::properties::manual_proof;
4830
4831        let mut flow = FlowBuilder::new();
4832        let node = flow.process::<()>();
4833
4834        let (trigger_send, trigger) = node.sim_input::<i32, TotalOrder, ExactlyOnce>();
4835
4836        let out_recv = sliced! {
4837            let batch = use(trigger, nondet!(/** test */));
4838            let counter = batch.location().source_iter(q!(vec![0i32]))
4839                .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4840            let counter_mut = counter.by_mut();
4841            let items = batch.location().source_iter(q!(vec![1i32, 2])).weaken_ordering::<NoOrder>();
4842            items.map(q!(
4843                |x| {
4844                    *counter_mut += x;
4845                    *counter_mut
4846                },
4847                commutative = manual_proof!(/** test */)
4848            ))
4849        }
4850        .sim_output();
4851
4852        let count = flow.sim().exhaustive(async || {
4853            trigger_send.send(1);
4854            let _all: Vec<i32> = out_recv.collect_sorted().await;
4855        });
4856
4857        assert_eq!(
4858            count, 2,
4859            "Expected 2 simulation instances due to mut on unordered input, got {}",
4860            count
4861        );
4862    }
4863
4864    /// A map with a mut singleton ref on a top-level unordered input should produce > 1
4865    /// simulation instance. Currently panics because observe_nondet doesn't support
4866    /// top-level bounded inputs yet.
4867    #[cfg(feature = "sim")]
4868    #[test]
4869    #[ignore = "observe_nondet not yet supported for top-level bounded inputs (https://github.com/hydro-project/hydro/issues/2950)"]
4870    fn sim_map_with_mut_on_unordered_top_level() {
4871        use crate::properties::manual_proof;
4872
4873        let mut flow = FlowBuilder::new();
4874        let node = flow.process::<()>();
4875
4876        let counter = node
4877            .source_iter(q!(vec![0i32]))
4878            .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4879        let counter_mut = counter.by_mut();
4880
4881        let out_recv = node
4882            .source_iter(q!(vec![1i32, 2]))
4883            .weaken_ordering::<NoOrder>()
4884            .map(q!(
4885                |x| {
4886                    *counter_mut += x;
4887                    *counter_mut
4888                },
4889                commutative = manual_proof!(/** test */)
4890            ))
4891            .assume_ordering::<TotalOrder>(nondet!(/** test */))
4892            .sim_output();
4893
4894        counter.into_stream().for_each(q!(|_| {}));
4895
4896        let count = flow.sim().exhaustive(async || {
4897            let _all: Vec<i32> = out_recv.collect().await;
4898        });
4899
4900        assert_eq!(
4901            count, 2,
4902            "Expected 2 simulation instances due to mut on unordered input, got {}",
4903            count
4904        );
4905    }
4906}