1use std::cell::RefCell;
4use std::future::Future;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q, quote_type};
11#[cfg(feature = "tokio")]
12use tokio::time::Instant;
13
14use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
15use super::keyed_singleton::KeyedSingleton;
16use super::keyed_stream::{Generate, KeyedStream};
17use super::optional::Optional;
18use super::singleton::Singleton;
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, StreamOrder, StreamRetry,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::batch_atomic::BatchAtomic;
27use crate::live_collections::singleton::SingletonBound;
28#[cfg(stageleft_runtime)]
29use crate::location::dynamic::{DynLocation, LocationId};
30use crate::location::tick::{Atomic, DeferTick};
31use crate::location::{Location, Tick, TopLevel, check_matching_location};
32use crate::manual_expr::ManualExpr;
33use crate::nondet::{NonDet, nondet};
34use crate::prelude::manual_proof;
35use crate::properties::{
36 AggFuncAlgebra, ApplyMonotoneStream, StreamMapFuncAlgebra, ValidCommutativityFor,
37 ValidIdempotenceFor, ValidMutBorrowCommutativityFor, ValidMutBorrowIdempotenceFor,
38 ValidMutCommutativityFor, ValidMutIdempotenceFor,
39};
40
41pub mod networking;
42
43#[sealed::sealed]
45pub trait Ordering:
46 MinOrder<Self, Min = Self> + MinOrder<TotalOrder, Min = Self> + MinOrder<NoOrder, Min = NoOrder>
47{
48 const ORDERING_KIND: StreamOrder;
50}
51
52pub enum TotalOrder {}
56
57#[sealed::sealed]
58impl Ordering for TotalOrder {
59 const ORDERING_KIND: StreamOrder = StreamOrder::TotalOrder;
60}
61
62pub enum NoOrder {}
68
69#[sealed::sealed]
70impl Ordering for NoOrder {
71 const ORDERING_KIND: StreamOrder = StreamOrder::NoOrder;
72}
73
74#[sealed::sealed]
78pub trait WeakerOrderingThan<Other: ?Sized>: Ordering {}
79#[sealed::sealed]
80impl<O: Ordering, O2: Ordering> WeakerOrderingThan<O2> for O where O: MinOrder<O2, Min = O> {}
81
82#[sealed::sealed]
84pub trait MinOrder<Other: ?Sized> {
85 type Min: Ordering;
87}
88
89#[sealed::sealed]
90impl<O: Ordering> MinOrder<O> for TotalOrder {
91 type Min = O;
92}
93
94#[sealed::sealed]
95impl<O: Ordering> MinOrder<O> for NoOrder {
96 type Min = NoOrder;
97}
98
99#[sealed::sealed]
101pub trait Retries:
102 MinRetries<Self, Min = Self>
103 + MinRetries<ExactlyOnce, Min = Self>
104 + MinRetries<AtLeastOnce, Min = AtLeastOnce>
105{
106 const RETRIES_KIND: StreamRetry;
108}
109
110pub enum ExactlyOnce {}
113
114#[sealed::sealed]
115impl Retries for ExactlyOnce {
116 const RETRIES_KIND: StreamRetry = StreamRetry::ExactlyOnce;
117}
118
119pub enum AtLeastOnce {}
122
123#[sealed::sealed]
124impl Retries for AtLeastOnce {
125 const RETRIES_KIND: StreamRetry = StreamRetry::AtLeastOnce;
126}
127
128#[sealed::sealed]
132pub trait WeakerRetryThan<Other: ?Sized>: Retries {}
133#[sealed::sealed]
134impl<R: Retries, R2: Retries> WeakerRetryThan<R2> for R where R: MinRetries<R2, Min = R> {}
135
136#[sealed::sealed]
138pub trait MinRetries<Other: ?Sized> {
139 type Min: Retries + WeakerRetryThan<Self> + WeakerRetryThan<Other>;
141}
142
143#[sealed::sealed]
144impl<R: Retries> MinRetries<R> for ExactlyOnce {
145 type Min = R;
146}
147
148#[sealed::sealed]
149impl<R: Retries> MinRetries<R> for AtLeastOnce {
150 type Min = AtLeastOnce;
151}
152
153#[sealed::sealed]
154#[diagnostic::on_unimplemented(
155 message = "The input stream must be totally-ordered (`TotalOrder`), but has order `{Self}`. Strengthen the order upstream or consider a different API.",
156 label = "required here",
157 note = "To intentionally process the stream by observing a non-deterministic (shuffled) order of elements, use `.assume_ordering`. This introduces non-determinism so avoid unless necessary."
158)]
159pub trait IsOrdered: Ordering {}
161
162#[sealed::sealed]
163#[diagnostic::do_not_recommend]
164impl IsOrdered for TotalOrder {}
165
166#[sealed::sealed]
167#[diagnostic::on_unimplemented(
168 message = "The input stream must be exactly-once (`ExactlyOnce`), but has retries `{Self}`. Strengthen the retries guarantee upstream or consider a different API.",
169 label = "required here",
170 note = "To intentionally process the stream by observing non-deterministic (randomly duplicated) retries, use `.assume_retries`. This introduces non-determinism so avoid unless necessary."
171)]
172pub trait IsExactlyOnce: Retries {}
174
175#[sealed::sealed]
176#[diagnostic::do_not_recommend]
177impl IsExactlyOnce for ExactlyOnce {}
178
179pub struct Stream<
199 Type,
200 Loc,
201 Bound: Boundedness = Unbounded,
202 Order: Ordering = TotalOrder,
203 Retry: Retries = ExactlyOnce,
204> {
205 pub(crate) location: Loc,
206 pub(crate) ir_node: RefCell<HydroNode>,
207 pub(crate) flow_state: FlowState,
208
209 _phantom: PhantomData<(Type, Loc, Bound, Order, Retry)>,
210}
211
212impl<T, L, B: Boundedness, O: Ordering, R: Retries> Drop for Stream<T, L, B, O, R> {
213 fn drop(&mut self) {
214 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
215 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
216 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
217 input: Box::new(ir_node),
218 op_metadata: HydroIrOpMetadata::new(),
219 });
220 }
221 }
222}
223
224impl<'a, T, L, O: Ordering, R: Retries> From<Stream<T, L, Bounded, O, R>>
225 for Stream<T, L, Unbounded, O, R>
226where
227 L: Location<'a>,
228{
229 fn from(stream: Stream<T, L, Bounded, O, R>) -> Stream<T, L, Unbounded, O, R> {
230 let new_meta = stream
231 .location
232 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind());
233
234 Stream {
235 location: stream.location.clone(),
236 flow_state: stream.flow_state.clone(),
237 ir_node: RefCell::new(HydroNode::Cast {
238 inner: Box::new(stream.ir_node.replace(HydroNode::Placeholder)),
239 metadata: new_meta,
240 }),
241 _phantom: PhantomData,
242 }
243 }
244}
245
246impl<'a, T, L, B: Boundedness, R: Retries> From<Stream<T, L, B, TotalOrder, R>>
247 for Stream<T, L, B, NoOrder, R>
248where
249 L: Location<'a>,
250{
251 fn from(stream: Stream<T, L, B, TotalOrder, R>) -> Stream<T, L, B, NoOrder, R> {
252 stream.weaken_ordering()
253 }
254}
255
256impl<'a, T, L, B: Boundedness, O: Ordering> From<Stream<T, L, B, O, ExactlyOnce>>
257 for Stream<T, L, B, O, AtLeastOnce>
258where
259 L: Location<'a>,
260{
261 fn from(stream: Stream<T, L, B, O, ExactlyOnce>) -> Stream<T, L, B, O, AtLeastOnce> {
262 stream.weaken_retries()
263 }
264}
265
266impl<'a, T, L, O: Ordering, R: Retries> DeferTick for Stream<T, Tick<L>, Bounded, O, R>
267where
268 L: Location<'a>,
269{
270 fn defer_tick(self) -> Self {
271 Stream::defer_tick(self)
272 }
273}
274
275impl<'a, T, L, O: Ordering, R: Retries> CycleCollection<'a, TickCycle>
276 for Stream<T, Tick<L>, Bounded, O, R>
277where
278 L: Location<'a>,
279{
280 type Location = Tick<L>;
281
282 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
283 Stream::new(
284 location.clone(),
285 HydroNode::CycleSource {
286 cycle_id,
287 metadata: location.new_node_metadata(Self::collection_kind()),
288 },
289 )
290 }
291}
292
293impl<'a, T, L, O: Ordering, R: Retries> CycleCollectionWithInitial<'a, TickCycle>
294 for Stream<T, Tick<L>, Bounded, O, R>
295where
296 L: Location<'a>,
297{
298 type Location = Tick<L>;
299
300 fn location(&self) -> &Self::Location {
301 self.location()
302 }
303
304 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
305 let from_previous_tick: Stream<T, Tick<L>, Bounded, O, R> = Stream::new(
306 location.clone(),
307 HydroNode::DeferTick {
308 input: Box::new(HydroNode::CycleSource {
309 cycle_id,
310 metadata: location.new_node_metadata(Self::collection_kind()),
311 }),
312 metadata: location.new_node_metadata(Self::collection_kind()),
313 },
314 );
315
316 from_previous_tick.chain(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
317 }
318}
319
320impl<'a, T, L, O: Ordering, R: Retries> ReceiverComplete<'a, TickCycle>
321 for Stream<T, Tick<L>, Bounded, O, R>
322where
323 L: Location<'a>,
324{
325 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
326 assert_eq!(
327 Location::id(&self.location),
328 expected_location,
329 "locations do not match"
330 );
331 self.location
332 .flow_state()
333 .borrow_mut()
334 .push_root(HydroRoot::CycleSink {
335 cycle_id,
336 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
337 op_metadata: HydroIrOpMetadata::new(),
338 });
339 }
340}
341
342impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
343 for Stream<T, L, B, O, R>
344where
345 L: Location<'a>,
346{
347 type Location = L;
348
349 fn create_source(cycle_id: CycleId, location: L) -> Self {
350 Stream::new(
351 location.clone(),
352 HydroNode::CycleSource {
353 cycle_id,
354 metadata: location.new_node_metadata(Self::collection_kind()),
355 },
356 )
357 }
358}
359
360impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
361 for Stream<T, L, B, O, R>
362where
363 L: Location<'a>,
364{
365 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
366 assert_eq!(
367 Location::id(&self.location),
368 expected_location,
369 "locations do not match"
370 );
371 self.location
372 .flow_state()
373 .borrow_mut()
374 .push_root(HydroRoot::CycleSink {
375 cycle_id,
376 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
377 op_metadata: HydroIrOpMetadata::new(),
378 });
379 }
380}
381
382impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Clone for Stream<T, L, B, O, R>
383where
384 T: Clone,
385 L: Location<'a>,
386{
387 fn clone(&self) -> Self {
388 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
389 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
390 *self.ir_node.borrow_mut() = HydroNode::Tee {
391 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
392 metadata: self.location.new_node_metadata(Self::collection_kind()),
393 };
394 }
395
396 let HydroNode::Tee { inner, metadata } = &*self.ir_node.borrow() else {
397 unreachable!()
398 };
399 Stream {
400 location: self.location.clone(),
401 flow_state: self.flow_state.clone(),
402 ir_node: HydroNode::Tee {
403 inner: SharedNode(inner.0.clone()),
404 metadata: metadata.clone(),
405 }
406 .into(),
407 _phantom: PhantomData,
408 }
409 }
410}
411
412impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
413where
414 L: Location<'a>,
415{
416 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
417 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
418 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
419
420 let flow_state = location.flow_state().clone();
421 Stream {
422 location,
423 flow_state,
424 ir_node: RefCell::new(ir_node),
425 _phantom: PhantomData,
426 }
427 }
428
429 pub fn location(&self) -> &L {
431 &self.location
432 }
433
434 pub fn by_ref(&self) -> crate::handoff_ref::StreamRef<'a, '_, T, L>
439 where
440 B: IsBounded,
441 {
442 crate::handoff_ref::StreamRef::new(&self.ir_node)
443 }
444
445 pub fn by_mut(&self) -> crate::handoff_ref::StreamMut<'a, '_, T, L>
448 where
449 B: IsBounded,
450 {
451 crate::handoff_ref::StreamMut::new(&self.ir_node)
452 }
453
454 pub fn weaken_consistency(self) -> Stream<T, L::DropConsistency, B, O, R>
457 where
458 L: Location<'a>,
459 {
460 if L::consistency()
461 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
462 {
463 Stream::new(
465 self.location.drop_consistency(),
466 self.ir_node.replace(HydroNode::Placeholder),
467 )
468 } else {
469 Stream::new(
470 self.location.drop_consistency(),
471 HydroNode::Cast {
472 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
473 metadata: self.location.drop_consistency().new_node_metadata(Stream::<
474 T,
475 L::DropConsistency,
476 B,
477 O,
478 R,
479 >::collection_kind(
480 )),
481 },
482 )
483 }
484 }
485
486 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
490 self,
491 _proof: impl crate::properties::ConsistencyProof,
492 ) -> Stream<T, L2, B, O, R>
493 where
494 L: Location<'a>,
495 {
496 if L::consistency() == L2::consistency() {
497 Stream::new(
498 self.location.with_consistency_of(),
499 self.ir_node.replace(HydroNode::Placeholder),
500 )
501 } else {
502 Stream::new(
503 self.location.with_consistency_of(),
504 HydroNode::AssertIsConsistent {
505 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
506 trusted: false,
507 metadata: self
508 .location
509 .clone()
510 .with_consistency_of::<L2>()
511 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
512 },
513 )
514 }
515 }
516
517 pub(crate) fn assert_has_consistency_of_trusted<
518 L2: Location<'a, DropConsistency = L::DropConsistency>,
519 >(
520 self,
521 _proof: impl crate::properties::ConsistencyProof,
522 ) -> Stream<T, L2, B, O, R>
523 where
524 L: Location<'a>,
525 {
526 if L::consistency() == L2::consistency() {
527 Stream::new(
528 self.location.with_consistency_of(),
529 self.ir_node.replace(HydroNode::Placeholder),
530 )
531 } else {
532 Stream::new(
533 self.location.with_consistency_of(),
534 HydroNode::AssertIsConsistent {
535 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
536 trusted: true,
537 metadata: self
538 .location
539 .clone()
540 .with_consistency_of::<L2>()
541 .new_node_metadata(Stream::<T, L2, B, O, R>::collection_kind()),
542 },
543 )
544 }
545 }
546
547 pub(crate) fn collection_kind() -> CollectionKind {
548 CollectionKind::Stream {
549 bound: B::BOUND_KIND,
550 order: O::ORDERING_KIND,
551 retry: R::RETRIES_KIND,
552 element_type: quote_type::<T>().into(),
553 }
554 }
555
556 pub fn map<U, F, C, I, const WAS_MUT: bool>(
576 self,
577 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, I>>,
578 ) -> Stream<U, L, B, O, R>
579 where
580 F: FnMut(T) -> U + 'a,
581 C: ValidMutCommutativityFor<F, T, U, O, WAS_MUT>,
582 I: ValidMutIdempotenceFor<F, T, U, R, WAS_MUT>,
583 {
584 let f = crate::handoff_ref::with_ref_capture(|| {
585 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
586 proof.register_proof(&expr);
587 expr.into()
588 });
589 Stream::new(
590 self.location.clone(),
591 HydroNode::Map {
592 f,
593 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
594 metadata: self
595 .location
596 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
597 },
598 )
599 }
600
601 pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
626 self,
627 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
628 ) -> Stream<U, L, B, O, R>
629 where
630 I: IntoIterator<Item = U>,
631 F: FnMut(T) -> I + 'a,
632 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
633 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
634 {
635 let f = crate::handoff_ref::with_ref_capture(|| {
636 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
637 proof.register_proof(&expr);
638 expr.into()
639 });
640 Stream::new(
641 self.location.clone(),
642 HydroNode::FlatMap {
643 f,
644 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
645 metadata: self
646 .location
647 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
648 },
649 )
650 }
651
652 pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
679 self,
680 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
681 ) -> Stream<U, L, B, NoOrder, R>
682 where
683 I: IntoIterator<Item = U>,
684 F: FnMut(T) -> I + 'a,
685 C: ValidMutCommutativityFor<F, T, I, O, WAS_MUT>,
686 Idemp: ValidMutIdempotenceFor<F, T, I, R, WAS_MUT>,
687 {
688 let f = crate::handoff_ref::with_ref_capture(|| {
689 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
690 proof.register_proof(&expr);
691 expr.into()
692 });
693 Stream::new(
694 self.location.clone(),
695 HydroNode::FlatMap {
696 f,
697 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
698 metadata: self
699 .location
700 .new_node_metadata(Stream::<U, L, B, NoOrder, R>::collection_kind()),
701 },
702 )
703 }
704
705 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, O, R>
728 where
729 T: IntoIterator<Item = U>,
730 {
731 self.flat_map_ordered(q!(|d| d))
732 }
733
734 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, R>
761 where
762 T: IntoIterator<Item = U>,
763 {
764 self.flat_map_unordered(q!(|d| d))
765 }
766
767 pub fn flat_map_stream_blocking<U, S, F, C, Idemp, const WAS_MUT: bool>(
771 self,
772 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
773 ) -> Stream<U, L, B, O, R>
774 where
775 S: futures::Stream<Item = U>,
776 F: FnMut(T) -> S + 'a,
777 C: ValidMutCommutativityFor<F, T, S, O, WAS_MUT>,
778 Idemp: ValidMutIdempotenceFor<F, T, S, R, WAS_MUT>,
779 {
780 let f = crate::handoff_ref::with_ref_capture(|| {
781 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
782 proof.register_proof(&expr);
783 expr.into()
784 });
785 Stream::new(
786 self.location.clone(),
787 HydroNode::FlatMapStreamBlocking {
788 f,
789 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
790 metadata: self
791 .location
792 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
793 },
794 )
795 }
796
797 pub fn flatten_stream_blocking<U>(self) -> Stream<U, L, B, O, R>
801 where
802 T: futures::Stream<Item = U>,
803 {
804 self.flat_map_stream_blocking(q!(|d| d))
805 }
806
807 pub fn filter<F, C, Idemp, const WAS_MUT: bool>(
832 self,
833 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
834 ) -> Self
835 where
836 F: FnMut(&T) -> bool + 'a,
837 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
838 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
839 {
840 let f = crate::handoff_ref::with_ref_capture(|| {
841 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
842 proof.register_proof(&expr);
843 expr.into()
844 });
845 Stream::new(
846 self.location.clone(),
847 HydroNode::Filter {
848 f,
849 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
850 metadata: self.location.new_node_metadata(Self::collection_kind()),
851 },
852 )
853 }
854
855 pub fn partition<F, C, Idemp, const WAS_MUT: bool>(
890 self,
891 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
892 ) -> (Stream<T, L, B, O, R>, Stream<T, L, B, O, R>)
893 where
894 F: FnMut(&T) -> bool + 'a,
895 C: ValidMutBorrowCommutativityFor<F, T, bool, O, WAS_MUT>,
896 Idemp: ValidMutBorrowIdempotenceFor<F, T, bool, R, WAS_MUT>,
897 {
898 let f = crate::handoff_ref::with_ref_capture(|| {
899 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location);
900 proof.register_proof(&expr);
901 expr.into()
902 });
903 let shared = SharedNode(Rc::new(RefCell::new(
904 self.ir_node.replace(HydroNode::Placeholder),
905 )));
906
907 let true_stream = Stream::new(
908 self.location.clone(),
909 HydroNode::Partition {
910 inner: SharedNode(shared.0.clone()),
911 f: f.clone(),
912 is_true: true,
913 metadata: self.location.new_node_metadata(Self::collection_kind()),
914 },
915 );
916
917 let false_stream = Stream::new(
918 self.location.clone(),
919 HydroNode::Partition {
920 inner: SharedNode(shared.0),
921 f,
922 is_true: false,
923 metadata: self.location.new_node_metadata(Self::collection_kind()),
924 },
925 );
926
927 (true_stream, false_stream)
928 }
929
930 pub fn filter_map<U, F, C, Idemp, const WAS_MUT: bool>(
950 self,
951 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
952 ) -> Stream<U, L, B, O, R>
953 where
954 F: FnMut(T) -> Option<U> + 'a,
955 C: ValidMutCommutativityFor<F, T, Option<U>, O, WAS_MUT>,
956 Idemp: ValidMutIdempotenceFor<F, T, Option<U>, R, WAS_MUT>,
957 {
958 let f = crate::handoff_ref::with_ref_capture(|| {
959 let (expr, proof) = f.splice_fnmut1_ctx_props(&self.location);
960 proof.register_proof(&expr);
961 expr.into()
962 });
963 Stream::new(
964 self.location.clone(),
965 HydroNode::FilterMap {
966 f,
967 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
968 metadata: self
969 .location
970 .new_node_metadata(Stream::<U, L, B, O, R>::collection_kind()),
971 },
972 )
973 }
974
975 pub fn cross_singleton<O2>(
1000 self,
1001 other: impl Into<Optional<O2, L, Bounded>>,
1002 ) -> Stream<(T, O2), L, B, O, R>
1003 where
1004 O2: Clone,
1005 {
1006 let other: Optional<O2, L, Bounded> = other.into();
1007 check_matching_location(&self.location, &other.location);
1008
1009 Stream::new(
1010 self.location.clone(),
1011 HydroNode::CrossSingleton {
1012 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1013 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1014 metadata: self
1015 .location
1016 .new_node_metadata(Stream::<(T, O2), L, B, O, R>::collection_kind()),
1017 },
1018 )
1019 }
1020
1021 pub fn filter_if(self, signal: Singleton<bool, L, Bounded>) -> Stream<T, L, B, O, R> {
1053 self.cross_singleton(signal.filter(q!(|b| *b)))
1054 .map(q!(|(d, _)| d))
1055 }
1056
1057 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1092 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1093 self.filter_if(signal.is_some())
1094 }
1095
1096 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1131 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Stream<T, L, B, O, R> {
1132 self.filter_if(other.is_none())
1133 }
1134
1135 pub fn cross_product<T2, B2: Boundedness, O2: Ordering, R2: Retries>(
1160 self,
1161 other: Stream<T2, L, B2, O2, R2>,
1162 ) -> Stream<(T, T2), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
1163 where
1164 T: Clone,
1165 T2: Clone,
1166 R: MinRetries<R2>,
1167 {
1168 self.map(q!(|v| ((), v)))
1169 .join(other.map(q!(|v| ((), v))))
1170 .map(q!(|((), (v1, v2))| (v1, v2)))
1171 }
1172
1173 pub fn unique(self) -> Stream<T, L, B, O, ExactlyOnce>
1192 where
1193 T: Eq + Hash,
1194 {
1195 Stream::new(
1196 self.location.clone(),
1197 HydroNode::Unique {
1198 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199 metadata: self
1200 .location
1201 .new_node_metadata(Stream::<T, L, B, O, ExactlyOnce>::collection_kind()),
1202 },
1203 )
1204 }
1205
1206 pub fn filter_not_in<O2: Ordering, B2>(self, other: Stream<T, L, B2, O2, R>) -> Self
1232 where
1233 T: Eq + Hash,
1234 B2: IsBounded,
1235 {
1236 check_matching_location(&self.location, &other.location);
1237
1238 Stream::new(
1239 self.location.clone(),
1240 HydroNode::Difference {
1241 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1242 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1243 metadata: self
1244 .location
1245 .new_node_metadata(Stream::<T, L, Bounded, O, R>::collection_kind()),
1246 },
1247 )
1248 }
1249
1250 pub fn inspect<F, C, Idemp, const WAS_MUT: bool>(
1271 self,
1272 f: impl IntoQuotedMut<'a, F, L::DropConsistency, StreamMapFuncAlgebra<C, Idemp>>,
1273 ) -> Self
1274 where
1275 F: FnMut(&T) + 'a,
1276 C: ValidMutBorrowCommutativityFor<F, T, (), O, WAS_MUT>,
1277 Idemp: ValidMutBorrowIdempotenceFor<F, T, (), R, WAS_MUT>,
1278 {
1279 let f = crate::handoff_ref::with_ref_capture(|| {
1280 let (expr, proof) = f.splice_fnmut1_borrow_ctx_props(&self.location.drop_consistency());
1281 proof.register_proof(&expr);
1282 expr.into()
1283 });
1284
1285 Stream::new(
1286 self.location.clone(),
1287 HydroNode::Inspect {
1288 f,
1289 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1290 metadata: self.location.new_node_metadata(Self::collection_kind()),
1291 },
1292 )
1293 }
1294
1295 pub fn for_each<F: FnMut(T) + 'a>(self, f: impl IntoQuotedMut<'a, F, L>)
1306 where
1307 O: IsOrdered,
1308 R: IsExactlyOnce,
1309 {
1310 let f = crate::handoff_ref::with_ref_capture(|| f.splice_fnmut1_ctx(&self.location).into());
1311 self.location
1312 .flow_state()
1313 .borrow_mut()
1314 .push_root(HydroRoot::ForEach {
1315 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1316 f,
1317 op_metadata: HydroIrOpMetadata::new(),
1318 });
1319 }
1320
1321 pub fn dest_sink<S>(self, sink: impl QuotedWithContext<'a, S, L>)
1327 where
1328 O: IsOrdered,
1329 R: IsExactlyOnce,
1330 S: 'a + futures::Sink<T> + Unpin,
1331 {
1332 self.location
1333 .flow_state()
1334 .borrow_mut()
1335 .push_root(HydroRoot::DestSink {
1336 sink: sink.splice_typed_ctx(&self.location).into(),
1337 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1338 op_metadata: HydroIrOpMetadata::new(),
1339 });
1340 }
1341
1342 pub fn enumerate(self) -> Stream<(usize, T), L, B, O, R>
1362 where
1363 O: IsOrdered,
1364 R: IsExactlyOnce,
1365 {
1366 Stream::new(
1367 self.location.clone(),
1368 HydroNode::Enumerate {
1369 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1370 metadata: self.location.new_node_metadata(Stream::<
1371 (usize, T),
1372 L,
1373 B,
1374 TotalOrder,
1375 ExactlyOnce,
1376 >::collection_kind()),
1377 },
1378 )
1379 }
1380
1381 pub fn fold<A, I, F, C, Idemp, M, B2: SingletonBound>(
1405 self,
1406 init: impl IntoQuotedMut<'a, I, L>,
1407 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp, M>>,
1408 ) -> Singleton<A, L, B2>
1409 where
1410 I: Fn() -> A + 'a,
1411 F: 'a + Fn(&mut A, T),
1412 C: ValidCommutativityFor<O>,
1413 Idemp: ValidIdempotenceFor<R>,
1414 B: ApplyMonotoneStream<M, B2>,
1415 {
1416 let init = init.splice_fn0_ctx(&self.location).into();
1417 let (comb, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1418 proof.register_proof(&comb);
1419
1420 let nondet = nondet!();
1423 let retried: Stream<T, L::DropConsistency, B, O, ExactlyOnce> = self.assume_retries(nondet);
1424
1425 let core = HydroNode::Fold {
1426 init,
1427 acc: comb.into(),
1428 input: Box::new(retried.ir_node.replace(HydroNode::Placeholder)),
1429 metadata: retried
1430 .location
1431 .new_node_metadata(Singleton::<A, L::DropConsistency, B2>::collection_kind()),
1432 };
1437
1438 Singleton::new(retried.location.clone(), core)
1439 .assert_has_consistency_of(manual_proof!())
1440 }
1441
1442 pub fn reduce<F, C, Idemp>(
1465 self,
1466 comb: impl IntoQuotedMut<'a, F, L, AggFuncAlgebra<C, Idemp>>,
1467 ) -> Optional<T, L, B>
1468 where
1469 F: Fn(&mut T, T) + 'a,
1470 C: ValidCommutativityFor<O>,
1471 Idemp: ValidIdempotenceFor<R>,
1472 {
1473 let (f, proof) = comb.splice_fn2_borrow_mut_ctx_props(&self.location);
1474 proof.register_proof(&f);
1475
1476 let nondet = nondet!();
1477 let ordered_etc: Stream<T, L::DropConsistency, B> =
1478 self.assume_retries(nondet).assume_ordering(nondet);
1479
1480 let core = HydroNode::Reduce {
1481 f: f.into(),
1482 input: Box::new(ordered_etc.ir_node.replace(HydroNode::Placeholder)),
1483 metadata: ordered_etc
1484 .location
1485 .new_node_metadata(Optional::<T, L::DropConsistency, B>::collection_kind()),
1486 };
1487
1488 Optional::new(ordered_etc.location.clone(), core)
1489 .assert_has_consistency_of(manual_proof!())
1490 }
1491
1492 pub fn max(self) -> Optional<T, L, B>
1512 where
1513 T: Ord,
1514 {
1515 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1516 .assume_ordering_trusted_bounded::<TotalOrder>(
1517 nondet!(),
1518 )
1519 .reduce(q!(|curr, new| {
1520 if new > *curr {
1521 *curr = new;
1522 }
1523 }))
1524 }
1525
1526 pub fn min(self) -> Optional<T, L, B>
1546 where
1547 T: Ord,
1548 {
1549 self.assume_retries_trusted::<ExactlyOnce>(nondet!())
1550 .assume_ordering_trusted_bounded::<TotalOrder>(
1551 nondet!(),
1552 )
1553 .reduce(q!(|curr, new| {
1554 if new < *curr {
1555 *curr = new;
1556 }
1557 }))
1558 }
1559
1560 pub fn first(self) -> Optional<T, L, B>
1583 where
1584 O: IsOrdered,
1585 {
1586 self.make_totally_ordered()
1587 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1588 .generator(q!(|| ()), q!(|_, item| Generate::Return(item)))
1589 .reduce(q!(|_, _| {}))
1590 }
1591
1592 pub fn last(self) -> Optional<T, L, B>
1615 where
1616 O: IsOrdered,
1617 {
1618 self.make_totally_ordered()
1619 .assume_retries_trusted::<ExactlyOnce>(nondet!())
1620 .reduce(q!(|curr, new| *curr = new))
1621 }
1622
1623 pub fn limit(
1646 self,
1647 n: impl QuotedWithContext<'a, usize, L> + Copy + 'a,
1648 ) -> Stream<T, L, B, TotalOrder, ExactlyOnce>
1649 where
1650 O: IsOrdered,
1651 R: IsExactlyOnce,
1652 {
1653 self.generator(
1654 q!(|| 0usize),
1655 q!(move |count, item| {
1656 if *count == n {
1657 Generate::Break
1658 } else {
1659 *count += 1;
1660 if *count == n {
1661 Generate::Return(item)
1662 } else {
1663 Generate::Yield(item)
1664 }
1665 }
1666 }),
1667 )
1668 }
1669
1670 pub fn collect_vec(self) -> Singleton<Vec<T>, L, B>
1696 where
1697 O: IsOrdered,
1698 R: IsExactlyOnce,
1699 {
1700 self.make_totally_ordered().make_exactly_once().fold(
1701 q!(|| vec![]),
1702 q!(|acc, v| {
1703 acc.push(v);
1704 }),
1705 )
1706 }
1707
1708 pub fn scan<A, U, I, F>(
1769 self,
1770 init: impl IntoQuotedMut<'a, I, L>,
1771 f: impl IntoQuotedMut<'a, F, L>,
1772 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1773 where
1774 O: IsOrdered,
1775 R: IsExactlyOnce,
1776 I: Fn() -> A + 'a,
1777 F: Fn(&mut A, T) -> Option<U> + 'a,
1778 {
1779 let init = init.splice_fn0_ctx(&self.location).into();
1780 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1781
1782 Stream::new(
1783 self.location.clone(),
1784 HydroNode::Scan {
1785 init,
1786 acc: f,
1787 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1788 metadata: self.location.new_node_metadata(
1789 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1790 ),
1791 },
1792 )
1793 }
1794
1795 pub fn scan_async_blocking<A, U, I, F, Fut>(
1829 self,
1830 init: impl IntoQuotedMut<'a, I, L>,
1831 f: impl IntoQuotedMut<'a, F, L>,
1832 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1833 where
1834 O: IsOrdered,
1835 R: IsExactlyOnce,
1836 I: Fn() -> A + 'a,
1837 F: Fn(&mut A, T) -> Fut + 'a,
1838 Fut: Future<Output = Option<U>> + 'a,
1839 {
1840 let init = init.splice_fn0_ctx(&self.location).into();
1841 let f = f.splice_fn2_borrow_mut_ctx(&self.location).into();
1842
1843 Stream::new(
1844 self.location.clone(),
1845 HydroNode::ScanAsyncBlocking {
1846 init,
1847 acc: f,
1848 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1849 metadata: self.location.new_node_metadata(
1850 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
1851 ),
1852 },
1853 )
1854 }
1855
1856 pub fn generator<A, U, I, F>(
1896 self,
1897 init: impl IntoQuotedMut<'a, I, L> + Copy,
1898 f: impl IntoQuotedMut<'a, F, L> + Copy,
1899 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
1900 where
1901 O: IsOrdered,
1902 R: IsExactlyOnce,
1903 I: Fn() -> A + 'a,
1904 F: Fn(&mut A, T) -> Generate<U> + 'a,
1905 {
1906 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
1907 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
1908
1909 let this = self.make_totally_ordered().make_exactly_once();
1910
1911 let scan_init = q!(|| None)
1916 .splice_fn0_ctx::<Option<Option<A>>>(&this.location)
1917 .into();
1918 let scan_f = q!(move |state: &mut Option<Option<_>>, v| {
1919 if state.is_none() {
1920 *state = Some(Some(init()));
1921 }
1922 match state {
1923 Some(Some(state_value)) => match f(state_value, v) {
1924 Generate::Yield(out) => Some(Some(out)),
1925 Generate::Return(out) => {
1926 *state = Some(None);
1927 Some(Some(out))
1928 }
1929 Generate::Break => None,
1933 Generate::Continue => Some(None),
1934 },
1935 _ => None,
1937 }
1938 })
1939 .splice_fn2_borrow_mut_ctx::<Option<Option<A>>, T, _>(&this.location)
1940 .into();
1941
1942 let scan_node = HydroNode::Scan {
1943 init: scan_init,
1944 acc: scan_f,
1945 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
1946 metadata: this.location.new_node_metadata(Stream::<
1947 Option<U>,
1948 L,
1949 B,
1950 TotalOrder,
1951 ExactlyOnce,
1952 >::collection_kind()),
1953 };
1954
1955 let flatten_f = q!(|d| d)
1956 .splice_fn1_ctx::<Option<U>, _>(&this.location)
1957 .into();
1958 let flatten_node = HydroNode::FlatMap {
1959 f: flatten_f,
1960 input: Box::new(scan_node),
1961 metadata: this
1962 .location
1963 .new_node_metadata(Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind()),
1964 };
1965
1966 Stream::new(this.location.clone(), flatten_node)
1967 }
1968
1969 #[cfg(feature = "tokio")]
1978 pub fn sample_every(
1979 self,
1980 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1981 nondet: NonDet,
1982 ) -> Stream<T, L::DropConsistency, Unbounded, O, AtLeastOnce>
1983 where
1984 L: TopLevel<'a>,
1985 {
1986 let samples = self.location.source_interval(interval);
1987
1988 let tick = self.location.tick();
1989 self.batch(&tick, nondet)
1990 .filter_if(samples.batch(&tick, nondet).first().is_some())
1991 .all_ticks()
1992 .weaken_retries()
1993 }
1994
1995 #[cfg(feature = "tokio")]
2005 pub fn timeout(
2006 self,
2007 duration: impl QuotedWithContext<'a, std::time::Duration, Tick<L::DropConsistency>> + Copy + 'a,
2008 nondet: NonDet,
2009 ) -> Optional<(), L::DropConsistency, Unbounded>
2010 where
2011 L: TopLevel<'a>,
2012 {
2013 let tick = self.location.tick();
2014
2015 let latest_received = self.assume_retries::<ExactlyOnce>(nondet).fold(
2016 q!(|| None),
2017 q!(
2018 |latest, _| {
2019 *latest = Some(Instant::now());
2020 },
2021 commutative = manual_proof!()
2022 ),
2023 );
2024
2025 latest_received
2026 .snapshot(&tick, nondet)
2027 .filter_map(q!(move |latest_received| {
2028 if let Some(latest_received) = latest_received {
2029 if Instant::now().duration_since(latest_received) > duration {
2030 Some(())
2031 } else {
2032 None
2033 }
2034 } else {
2035 Some(())
2036 }
2037 }))
2038 .latest()
2039 }
2040
2041 pub fn atomic(self) -> Stream<T, Atomic<L>, B, O, R> {
2047 let id = self.location.flow_state().borrow_mut().next_clock_id();
2048 let out_location = Atomic {
2049 tick: Tick {
2050 id,
2051 l: self.location.clone(),
2052 },
2053 };
2054 Stream::new(
2055 out_location.clone(),
2056 HydroNode::BeginAtomic {
2057 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2058 metadata: out_location
2059 .new_node_metadata(Stream::<T, Atomic<L>, B, O, R>::collection_kind()),
2060 },
2061 )
2062 }
2063
2064 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
2072 self,
2073 tick: &Tick<L2>,
2074 _nondet: NonDet,
2075 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
2076 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
2077 Stream::new(
2078 tick.drop_consistency(),
2079 HydroNode::Batch {
2080 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2081 metadata: tick
2082 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
2083 },
2084 )
2085 }
2086
2087 pub fn ir_node_named(self, name: &str) -> Stream<T, L, B, O, R> {
2090 {
2091 let mut node = self.ir_node.borrow_mut();
2092 let metadata = node.metadata_mut();
2093 metadata.tag = Some(name.to_owned());
2094 }
2095 self
2096 }
2097
2098 pub(crate) fn cast_at_most_one_element(self) -> Optional<T, L, B>
2102 where
2103 B: IsBounded,
2104 {
2105 Optional::new(
2106 self.location.clone(),
2107 HydroNode::Cast {
2108 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2109 metadata: self
2110 .location
2111 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
2112 },
2113 )
2114 }
2115
2116 pub(crate) fn use_ordering_type<O2: Ordering>(self) -> Stream<T, L, B, O2, R> {
2117 if O::ORDERING_KIND == O2::ORDERING_KIND {
2118 Stream::new(
2119 self.location.clone(),
2120 self.ir_node.replace(HydroNode::Placeholder),
2121 )
2122 } else {
2123 panic!(
2124 "Runtime ordering {:?} did not match requested cast {:?}.",
2125 O::ORDERING_KIND,
2126 O2::ORDERING_KIND
2127 )
2128 }
2129 }
2130
2131 pub fn assume_ordering<O2: Ordering>(
2140 self,
2141 _nondet: NonDet,
2142 ) -> Stream<T, L::DropConsistency, B, O2, R> {
2143 if O::ORDERING_KIND == O2::ORDERING_KIND {
2144 self.use_ordering_type().weaken_consistency()
2145 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2146 let target_location = self.location().drop_consistency();
2148 Stream::new(
2149 target_location.clone(),
2150 HydroNode::Cast {
2151 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2152 metadata: target_location
2153 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2154 },
2155 )
2156 } else {
2157 let target_location = self.location().drop_consistency();
2158 Stream::new(
2159 target_location.clone(),
2160 HydroNode::ObserveNonDet {
2161 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2162 trusted: false,
2163 metadata: target_location
2164 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2165 },
2166 )
2167 }
2168 }
2169
2170 fn assume_ordering_trusted_bounded<O2: Ordering>(
2173 self,
2174 nondet: NonDet,
2175 ) -> Stream<T, L, B, O2, R> {
2176 if B::BOUNDED {
2177 self.assume_ordering_trusted(nondet)
2178 } else {
2179 let self_location = self.location.clone();
2180 let inner: Stream<T, L::DropConsistency, B, O2, R> = self.assume_ordering(nondet);
2181 Stream::new(self_location, inner.ir_node.replace(HydroNode::Placeholder))
2182 }
2183 }
2184
2185 pub(crate) fn assume_ordering_trusted<O2: Ordering>(
2188 self,
2189 _nondet: NonDet,
2190 ) -> Stream<T, L, B, O2, R> {
2191 if O::ORDERING_KIND == O2::ORDERING_KIND {
2192 self.use_ordering_type()
2193 } else if O2::ORDERING_KIND == StreamOrder::NoOrder {
2194 Stream::new(
2196 self.location.clone(),
2197 HydroNode::Cast {
2198 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2199 metadata: self
2200 .location
2201 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2202 },
2203 )
2204 } else {
2205 Stream::new(
2206 self.location.clone(),
2207 HydroNode::ObserveNonDet {
2208 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2209 trusted: true,
2210 metadata: self
2211 .location
2212 .new_node_metadata(Stream::<T, L, B, O2, R>::collection_kind()),
2213 },
2214 )
2215 }
2216 }
2217
2218 #[deprecated = "use `weaken_ordering::<NoOrder>()` instead"]
2219 pub fn weakest_ordering(self) -> Stream<T, L, B, NoOrder, R> {
2222 self.weaken_ordering::<NoOrder>()
2223 }
2224
2225 pub fn weaken_ordering<O2: WeakerOrderingThan<O>>(self) -> Stream<T, L, B, O2, R> {
2228 let nondet = nondet!();
2229 self.assume_ordering_trusted::<O2>(nondet)
2230 }
2231
2232 pub fn make_totally_ordered(self) -> Stream<T, L, B, TotalOrder, R>
2235 where
2236 O: IsOrdered,
2237 {
2238 self.assume_ordering_trusted(nondet!())
2239 }
2240
2241 pub fn assume_retries<R2: Retries>(
2250 self,
2251 _nondet: NonDet,
2252 ) -> Stream<T, L::DropConsistency, B, O, R2> {
2253 if R::RETRIES_KIND == R2::RETRIES_KIND {
2254 Stream::new(
2255 self.location.drop_consistency(),
2256 self.ir_node.replace(HydroNode::Placeholder),
2257 )
2258 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2259 let target_location = self.location.drop_consistency();
2261 Stream::new(
2262 target_location.clone(),
2263 HydroNode::Cast {
2264 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2265 metadata: target_location
2266 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2267 },
2268 )
2269 } else {
2270 let target_location = self.location.drop_consistency();
2271 Stream::new(
2272 target_location.clone(),
2273 HydroNode::ObserveNonDet {
2274 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2275 trusted: false,
2276 metadata: target_location
2277 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2278 },
2279 )
2280 }
2281 }
2282
2283 fn assume_retries_trusted<R2: Retries>(self, _nondet: NonDet) -> Stream<T, L, B, O, R2> {
2286 if R::RETRIES_KIND == R2::RETRIES_KIND {
2287 Stream::new(
2288 self.location.clone(),
2289 self.ir_node.replace(HydroNode::Placeholder),
2290 )
2291 } else if R2::RETRIES_KIND == StreamRetry::AtLeastOnce {
2292 Stream::new(
2294 self.location.clone(),
2295 HydroNode::Cast {
2296 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2297 metadata: self
2298 .location
2299 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2300 },
2301 )
2302 } else {
2303 Stream::new(
2304 self.location.clone(),
2305 HydroNode::ObserveNonDet {
2306 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2307 trusted: true,
2308 metadata: self
2309 .location
2310 .new_node_metadata(Stream::<T, L, B, O, R2>::collection_kind()),
2311 },
2312 )
2313 }
2314 }
2315
2316 #[deprecated = "use `weaken_retries::<AtLeastOnce>()` instead"]
2317 pub fn weakest_retries(self) -> Stream<T, L, B, O, AtLeastOnce> {
2320 self.weaken_retries::<AtLeastOnce>()
2321 }
2322
2323 pub fn weaken_retries<R2: WeakerRetryThan<R>>(self) -> Stream<T, L, B, O, R2> {
2326 let nondet = nondet!();
2327 self.assume_retries_trusted::<R2>(nondet)
2328 }
2329
2330 pub fn make_exactly_once(self) -> Stream<T, L, B, O, ExactlyOnce>
2333 where
2334 R: IsExactlyOnce,
2335 {
2336 self.assume_retries_trusted(nondet!())
2337 }
2338
2339 pub fn make_bounded(self) -> Stream<T, L, Bounded, O, R>
2342 where
2343 B: IsBounded,
2344 {
2345 self.weaken_boundedness()
2346 }
2347
2348 pub fn weaken_boundedness<B2: Boundedness>(self) -> Stream<T, L, B2, O, R> {
2351 if B::BOUNDED == B2::BOUNDED {
2352 Stream::new(
2353 self.location.clone(),
2354 self.ir_node.replace(HydroNode::Placeholder),
2355 )
2356 } else {
2357 Stream::new(
2359 self.location.clone(),
2360 HydroNode::Cast {
2361 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2362 metadata: self
2363 .location
2364 .new_node_metadata(Stream::<T, L, B2, O, R>::collection_kind()),
2365 },
2366 )
2367 }
2368 }
2369}
2370
2371impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<&T, L, B, O, R>
2372where
2373 L: Location<'a>,
2374{
2375 pub fn cloned(self) -> Stream<T, L, B, O, R>
2393 where
2394 T: Clone,
2395 {
2396 self.map(q!(|d| d.clone()))
2397 }
2398}
2399
2400impl<'a, T, L, B: Boundedness, O: Ordering> Stream<T, L, B, O, ExactlyOnce>
2401where
2402 L: Location<'a>,
2403{
2404 pub fn count(self) -> Singleton<usize, L, B::StreamToMonotone> {
2423 self.assume_ordering_trusted::<TotalOrder>(nondet!(
2424 ))
2426 .fold(
2427 q!(|| 0usize),
2428 q!(
2429 |count, _| *count += 1,
2430 monotone = manual_proof!()
2431 ),
2432 )
2433 }
2434}
2435
2436impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Stream<T, L, Unbounded, O, R> {
2437 pub fn merge_unordered<O2: Ordering, R2: Retries>(
2461 self,
2462 other: Stream<T, L, Unbounded, O2, R2>,
2463 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2464 where
2465 R: MinRetries<R2>,
2466 {
2467 Stream::new(
2468 self.location.clone(),
2469 HydroNode::Chain {
2470 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2471 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2472 metadata: self.location.new_node_metadata(Stream::<
2473 T,
2474 L,
2475 Unbounded,
2476 NoOrder,
2477 <R as MinRetries<R2>>::Min,
2478 >::collection_kind()),
2479 },
2480 )
2481 }
2482
2483 #[deprecated(note = "use `merge_unordered` instead")]
2485 pub fn interleave<O2: Ordering, R2: Retries>(
2486 self,
2487 other: Stream<T, L, Unbounded, O2, R2>,
2488 ) -> Stream<T, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
2489 where
2490 R: MinRetries<R2>,
2491 {
2492 self.merge_unordered(other)
2493 }
2494}
2495
2496impl<'a, T, L: Location<'a>, B: Boundedness, R: Retries> Stream<T, L, B, TotalOrder, R> {
2497 pub fn merge_ordered<R2: Retries>(
2525 self,
2526 other: Stream<T, L, B, TotalOrder, R2>,
2527 _nondet: NonDet,
2528 ) -> Stream<T, L::DropConsistency, B, TotalOrder, <R as MinRetries<R2>>::Min>
2529 where
2530 R: MinRetries<R2>,
2531 {
2532 let target_location = self.location().drop_consistency();
2533 Stream::new(
2534 target_location.clone(),
2535 HydroNode::MergeOrdered {
2536 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2537 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2538 metadata: target_location.new_node_metadata(Stream::<
2539 T,
2540 L::DropConsistency,
2541 B,
2542 TotalOrder,
2543 <R as MinRetries<R2>>::Min,
2544 >::collection_kind()),
2545 },
2546 )
2547 }
2548}
2549
2550impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, L, B, O, R>
2551where
2552 L: Location<'a>,
2553{
2554 pub fn sort(self) -> Stream<T, L, Bounded, TotalOrder, R>
2580 where
2581 B: IsBounded,
2582 T: Ord,
2583 {
2584 let this = self.make_bounded();
2585 Stream::new(
2586 this.location.clone(),
2587 HydroNode::Sort {
2588 input: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2589 metadata: this
2590 .location
2591 .new_node_metadata(Stream::<T, L, Bounded, TotalOrder, R>::collection_kind()),
2592 },
2593 )
2594 }
2595
2596 pub fn chain<O2: Ordering, R2: Retries, B2: Boundedness>(
2624 self,
2625 other: Stream<T, L, B2, O2, R2>,
2626 ) -> Stream<T, L, B2, <O as MinOrder<O2>>::Min, <R as MinRetries<R2>>::Min>
2627 where
2628 B: IsBounded,
2629 O: MinOrder<O2>,
2630 R: MinRetries<R2>,
2631 {
2632 check_matching_location(&self.location, &other.location);
2633
2634 Stream::new(
2635 self.location.clone(),
2636 HydroNode::Chain {
2637 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2638 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2639 metadata: self.location.new_node_metadata(Stream::<
2640 T,
2641 L,
2642 B2,
2643 <O as MinOrder<O2>>::Min,
2644 <R as MinRetries<R2>>::Min,
2645 >::collection_kind()),
2646 },
2647 )
2648 }
2649
2650 pub fn cross_product_nested_loop<T2, O2: Ordering + MinOrder<O>, R2: Retries>(
2654 self,
2655 other: Stream<T2, L, Bounded, O2, R2>,
2656 ) -> Stream<(T, T2), L, Bounded, <O2 as MinOrder<O>>::Min, <R as MinRetries<R2>>::Min>
2657 where
2658 B: IsBounded,
2659 T: Clone,
2660 T2: Clone,
2661 R: MinRetries<R2>,
2662 {
2663 let this = self.make_bounded();
2664 check_matching_location(&this.location, &other.location);
2665
2666 Stream::new(
2667 this.location.clone(),
2668 HydroNode::CrossProduct {
2669 left: Box::new(this.ir_node.replace(HydroNode::Placeholder)),
2670 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
2671 metadata: this.location.new_node_metadata(Stream::<
2672 (T, T2),
2673 L,
2674 Bounded,
2675 <O2 as MinOrder<O>>::Min,
2676 <R as MinRetries<R2>>::Min,
2677 >::collection_kind()),
2678 },
2679 )
2680 }
2681
2682 pub fn repeat_with_keys<K, V2>(
2720 self,
2721 keys: KeyedSingleton<K, V2, L, Bounded>,
2722 ) -> KeyedStream<K, T, L, Bounded, O, R>
2723 where
2724 B: IsBounded,
2725 K: Clone,
2726 T: Clone,
2727 {
2728 keys.keys()
2729 .assume_ordering_trusted::<TotalOrder>(
2730 nondet!(),
2731 )
2732 .cross_product_nested_loop(self.make_bounded())
2733 .into_keyed()
2734 }
2735
2736 pub fn resolve_futures_blocking(self) -> Stream<T::Output, L, B, NoOrder, R>
2773 where
2774 T: Future,
2775 {
2776 Stream::new(
2777 self.location.clone(),
2778 HydroNode::ResolveFuturesBlocking {
2779 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2780 metadata: self
2781 .location
2782 .new_node_metadata(Stream::<T::Output, L, B, NoOrder, R>::collection_kind()),
2783 },
2784 )
2785 }
2786
2787 #[expect(clippy::wrong_self_convention, reason = "stream function naming")]
2807 pub fn is_empty(self) -> Singleton<bool, L, Bounded>
2808 where
2809 B: IsBounded,
2810 {
2811 self.make_bounded()
2812 .assume_ordering_trusted::<TotalOrder>(
2813 nondet!(),
2814 )
2815 .first()
2816 .is_none()
2817 }
2818}
2819
2820impl<'a, K, V1, L, B: Boundedness, O: Ordering, R: Retries> Stream<(K, V1), L, B, O, R>
2821where
2822 L: Location<'a>,
2823{
2824 pub fn join<V2, B2: Boundedness, O2: Ordering, R2: Retries>(
2849 self,
2850 n: Stream<(K, V2), L, B2, O2, R2>,
2851 ) -> Stream<(K, (V1, V2)), L, B, B2::PreserveOrderIfBounded<O>, <R as MinRetries<R2>>::Min>
2852 where
2853 K: Eq + Hash + Clone,
2854 R: MinRetries<R2>,
2855 V1: Clone,
2856 V2: Clone,
2857 {
2858 check_matching_location(&self.location, &n.location);
2859
2860 let ir_node = if B2::BOUNDED {
2861 HydroNode::JoinHalf {
2862 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2863 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2864 metadata: self.location.new_node_metadata(Stream::<
2865 (K, (V1, V2)),
2866 L,
2867 B,
2868 B2::PreserveOrderIfBounded<O>,
2869 <R as MinRetries<R2>>::Min,
2870 >::collection_kind()),
2871 }
2872 } else {
2873 HydroNode::Join {
2874 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2875 right: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2876 metadata: self.location.new_node_metadata(Stream::<
2877 (K, (V1, V2)),
2878 L,
2879 B,
2880 B2::PreserveOrderIfBounded<O>,
2881 <R as MinRetries<R2>>::Min,
2882 >::collection_kind()),
2883 }
2884 };
2885
2886 Stream::new(self.location.clone(), ir_node)
2887 }
2888
2889 pub fn anti_join<O2: Ordering, R2: Retries>(
2915 self,
2916 n: Stream<K, L, Bounded, O2, R2>,
2917 ) -> Stream<(K, V1), L, B, O, R>
2918 where
2919 K: Eq + Hash,
2920 {
2921 check_matching_location(&self.location, &n.location);
2922
2923 Stream::new(
2924 self.location.clone(),
2925 HydroNode::AntiJoin {
2926 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2927 neg: Box::new(n.ir_node.replace(HydroNode::Placeholder)),
2928 metadata: self
2929 .location
2930 .new_node_metadata(Stream::<(K, V1), L, B, O, R>::collection_kind()),
2931 },
2932 )
2933 }
2934}
2935
2936impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
2937 Stream<(K, V), L, B, O, R>
2938{
2939 pub fn into_keyed(self) -> KeyedStream<K, V, L, B, O, R> {
2966 KeyedStream::new(
2967 self.location.clone(),
2968 HydroNode::Cast {
2969 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
2970 metadata: self
2971 .location
2972 .new_node_metadata(KeyedStream::<K, V, L, B, O, R>::collection_kind()),
2973 },
2974 )
2975 }
2976}
2977
2978impl<'a, K, V, L, O: Ordering, R: Retries> Stream<(K, V), Tick<L>, Bounded, O, R>
2979where
2980 K: Eq + Hash,
2981 L: Location<'a>,
2982{
2983 pub fn keys(self) -> Stream<K, Tick<L>, Bounded, NoOrder, ExactlyOnce> {
3002 self.into_keyed()
3003 .fold(
3004 q!(|| ()),
3005 q!(
3006 |_, _| {},
3007 commutative = manual_proof!(),
3008 idempotent = manual_proof!()
3009 ),
3010 )
3011 .keys()
3012 }
3013}
3014
3015impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Atomic<L>, B, O, R>
3016where
3017 L: Location<'a>,
3018{
3019 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
3026 self,
3027 tick: &Tick<L2>,
3028 _nondet: NonDet,
3029 ) -> Stream<T, Tick<L::DropConsistency>, Bounded, O, R> {
3030 Stream::new(
3031 tick.drop_consistency(),
3032 HydroNode::Batch {
3033 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3034 metadata: tick
3035 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3036 },
3037 )
3038 }
3039
3040 pub fn end_atomic(self) -> Stream<T, L, B, O, R> {
3043 Stream::new(
3044 self.location.tick.l.clone(),
3045 HydroNode::EndAtomic {
3046 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3047 metadata: self
3048 .location
3049 .tick
3050 .l
3051 .new_node_metadata(Stream::<T, L, B, O, R>::collection_kind()),
3052 },
3053 )
3054 }
3055}
3056
3057impl<'a, F, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<F, L, B, O, R>
3058where
3059 L: TopLevel<'a>,
3060 F: Future<Output = T>,
3061{
3062 pub fn resolve_futures(self) -> Stream<T, L, Unbounded, NoOrder, R> {
3093 Stream::new(
3094 self.location.clone(),
3095 HydroNode::ResolveFutures {
3096 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3097 metadata: self
3098 .location
3099 .new_node_metadata(Stream::<T, L, Unbounded, NoOrder, R>::collection_kind()),
3100 },
3101 )
3102 }
3103
3104 pub fn resolve_futures_ordered(self) -> Stream<T, L, Unbounded, O, R> {
3135 Stream::new(
3136 self.location.clone(),
3137 HydroNode::ResolveFuturesOrdered {
3138 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3139 metadata: self
3140 .location
3141 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3142 },
3143 )
3144 }
3145}
3146
3147impl<'a, T, L, O: Ordering, R: Retries> Stream<T, Tick<L>, Bounded, O, R>
3148where
3149 L: Location<'a>,
3150{
3151 pub fn all_ticks(self) -> Stream<T, L, Unbounded, O, R> {
3154 Stream::new(
3155 self.location.outer().clone(),
3156 HydroNode::YieldConcat {
3157 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3158 metadata: self
3159 .location
3160 .outer()
3161 .new_node_metadata(Stream::<T, L, Unbounded, O, R>::collection_kind()),
3162 },
3163 )
3164 }
3165
3166 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, O, R> {
3173 let out_location = Atomic {
3174 tick: self.location.clone(),
3175 };
3176
3177 Stream::new(
3178 out_location.clone(),
3179 HydroNode::YieldConcat {
3180 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3181 metadata: out_location
3182 .new_node_metadata(Stream::<T, Atomic<L>, Unbounded, O, R>::collection_kind()),
3183 },
3184 )
3185 }
3186
3187 pub fn across_ticks<Out: BatchAtomic<'a>>(
3223 self,
3224 thunk: impl FnOnce(Stream<T, Atomic<L>, Unbounded, O, R>) -> Out,
3225 ) -> Out::Batched {
3226 thunk(self.all_ticks_atomic()).batched_atomic()
3227 }
3228
3229 pub fn defer_tick(self) -> Stream<T, Tick<L>, Bounded, O, R> {
3268 Stream::new(
3269 self.location.clone(),
3270 HydroNode::DeferTick {
3271 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
3272 metadata: self
3273 .location
3274 .new_node_metadata(Stream::<T, Tick<L>, Bounded, O, R>::collection_kind()),
3275 },
3276 )
3277 }
3278}
3279
3280#[cfg(test)]
3281mod tests {
3282 #[cfg(feature = "deploy")]
3283 use futures::{SinkExt, StreamExt};
3284 #[cfg(feature = "deploy")]
3285 use hydro_deploy::Deployment;
3286 #[cfg(feature = "deploy")]
3287 use serde::{Deserialize, Serialize};
3288 #[cfg(any(feature = "deploy", feature = "sim"))]
3289 use stageleft::q;
3290
3291 #[cfg(any(feature = "deploy", feature = "sim"))]
3292 use crate::compile::builder::FlowBuilder;
3293 #[cfg(feature = "deploy")]
3294 use crate::live_collections::sliced::sliced;
3295 #[cfg(feature = "deploy")]
3296 use crate::live_collections::stream::ExactlyOnce;
3297 #[cfg(feature = "sim")]
3298 use crate::live_collections::stream::NoOrder;
3299 #[cfg(any(feature = "deploy", feature = "sim"))]
3300 use crate::live_collections::stream::TotalOrder;
3301 #[cfg(any(feature = "deploy", feature = "sim"))]
3302 use crate::location::Location;
3303 #[cfg(feature = "sim")]
3304 use crate::networking::TCP;
3305 #[cfg(any(feature = "deploy", feature = "sim"))]
3306 use crate::nondet::nondet;
3307
3308 mod backtrace_chained_ops;
3309
3310 #[cfg(feature = "deploy")]
3311 struct P1 {}
3312 #[cfg(feature = "deploy")]
3313 struct P2 {}
3314
3315 #[cfg(feature = "deploy")]
3316 #[derive(Serialize, Deserialize, Debug)]
3317 struct SendOverNetwork {
3318 n: u32,
3319 }
3320
3321 #[cfg(feature = "deploy")]
3322 #[tokio::test]
3323 async fn first_ten_distributed() {
3324 use crate::networking::TCP;
3325
3326 let mut deployment = Deployment::new();
3327
3328 let mut flow = FlowBuilder::new();
3329 let first_node = flow.process::<P1>();
3330 let second_node = flow.process::<P2>();
3331 let external = flow.external::<P2>();
3332
3333 let numbers = first_node.source_iter(q!(0..10));
3334 let out_port = numbers
3335 .map(q!(|n| SendOverNetwork { n }))
3336 .send(&second_node, TCP.fail_stop().bincode())
3337 .send_bincode_external(&external);
3338
3339 let nodes = flow
3340 .with_process(&first_node, deployment.Localhost())
3341 .with_process(&second_node, deployment.Localhost())
3342 .with_external(&external, deployment.Localhost())
3343 .deploy(&mut deployment);
3344
3345 deployment.deploy().await.unwrap();
3346
3347 let mut external_out = nodes.connect(out_port).await;
3348
3349 deployment.start().await.unwrap();
3350
3351 for i in 0..10 {
3352 assert_eq!(external_out.next().await.unwrap().n, i);
3353 }
3354 }
3355
3356 #[cfg(feature = "deploy")]
3357 #[tokio::test]
3358 async fn first_cardinality() {
3359 let mut deployment = Deployment::new();
3360
3361 let mut flow = FlowBuilder::new();
3362 let node = flow.process::<()>();
3363 let external = flow.external::<()>();
3364
3365 let node_tick = node.tick();
3366 let count = node_tick
3367 .singleton(q!([1, 2, 3]))
3368 .into_stream()
3369 .flatten_ordered()
3370 .first()
3371 .into_stream()
3372 .count()
3373 .all_ticks()
3374 .send_bincode_external(&external);
3375
3376 let nodes = flow
3377 .with_process(&node, deployment.Localhost())
3378 .with_external(&external, deployment.Localhost())
3379 .deploy(&mut deployment);
3380
3381 deployment.deploy().await.unwrap();
3382
3383 let mut external_out = nodes.connect(count).await;
3384
3385 deployment.start().await.unwrap();
3386
3387 assert_eq!(external_out.next().await.unwrap(), 1);
3388 }
3389
3390 #[cfg(feature = "deploy")]
3391 #[tokio::test]
3392 async fn unbounded_reduce_remembers_state() {
3393 let mut deployment = Deployment::new();
3394
3395 let mut flow = FlowBuilder::new();
3396 let node = flow.process::<()>();
3397 let external = flow.external::<()>();
3398
3399 let (input_port, input) = node.source_external_bincode(&external);
3400 let out = input
3401 .reduce(q!(|acc, v| *acc += v))
3402 .sample_eager(nondet!())
3403 .send_bincode_external(&external);
3404
3405 let nodes = flow
3406 .with_process(&node, deployment.Localhost())
3407 .with_external(&external, deployment.Localhost())
3408 .deploy(&mut deployment);
3409
3410 deployment.deploy().await.unwrap();
3411
3412 let mut external_in = nodes.connect(input_port).await;
3413 let mut external_out = nodes.connect(out).await;
3414
3415 deployment.start().await.unwrap();
3416
3417 external_in.send(1).await.unwrap();
3418 assert_eq!(external_out.next().await.unwrap(), 1);
3419
3420 external_in.send(2).await.unwrap();
3421 assert_eq!(external_out.next().await.unwrap(), 3);
3422 }
3423
3424 #[cfg(feature = "deploy")]
3425 #[tokio::test]
3426 async fn top_level_bounded_cross_singleton() {
3427 let mut deployment = Deployment::new();
3428
3429 let mut flow = FlowBuilder::new();
3430 let node = flow.process::<()>();
3431 let external = flow.external::<()>();
3432
3433 let (input_port, input) =
3434 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3435
3436 let out = input
3437 .cross_singleton(
3438 node.source_iter(q!(vec![1, 2, 3]))
3439 .fold(q!(|| 0), q!(|acc, v| *acc += v)),
3440 )
3441 .send_bincode_external(&external);
3442
3443 let nodes = flow
3444 .with_process(&node, deployment.Localhost())
3445 .with_external(&external, deployment.Localhost())
3446 .deploy(&mut deployment);
3447
3448 deployment.deploy().await.unwrap();
3449
3450 let mut external_in = nodes.connect(input_port).await;
3451 let mut external_out = nodes.connect(out).await;
3452
3453 deployment.start().await.unwrap();
3454
3455 external_in.send(1).await.unwrap();
3456 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3457
3458 external_in.send(2).await.unwrap();
3459 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3460 }
3461
3462 #[cfg(feature = "deploy")]
3463 #[tokio::test]
3464 async fn top_level_bounded_reduce_cardinality() {
3465 let mut deployment = Deployment::new();
3466
3467 let mut flow = FlowBuilder::new();
3468 let node = flow.process::<()>();
3469 let external = flow.external::<()>();
3470
3471 let (input_port, input) =
3472 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3473
3474 let out = sliced! {
3475 let input = use(input, nondet!());
3476 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)), nondet!());
3477 input.cross_singleton(v.into_stream().count())
3478 }
3479 .send_bincode_external(&external);
3480
3481 let nodes = flow
3482 .with_process(&node, deployment.Localhost())
3483 .with_external(&external, deployment.Localhost())
3484 .deploy(&mut deployment);
3485
3486 deployment.deploy().await.unwrap();
3487
3488 let mut external_in = nodes.connect(input_port).await;
3489 let mut external_out = nodes.connect(out).await;
3490
3491 deployment.start().await.unwrap();
3492
3493 external_in.send(1).await.unwrap();
3494 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3495
3496 external_in.send(2).await.unwrap();
3497 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3498 }
3499
3500 #[cfg(feature = "deploy")]
3501 #[tokio::test]
3502 async fn top_level_bounded_into_singleton_cardinality() {
3503 let mut deployment = Deployment::new();
3504
3505 let mut flow = FlowBuilder::new();
3506 let node = flow.process::<()>();
3507 let external = flow.external::<()>();
3508
3509 let (input_port, input) =
3510 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3511
3512 let out = sliced! {
3513 let input = use(input, nondet!());
3514 let v = use(node.source_iter(q!(vec![1, 2, 3])).reduce(q!(|acc, v| *acc += v)).into_singleton(), nondet!());
3515 input.cross_singleton(v.into_stream().count())
3516 }
3517 .send_bincode_external(&external);
3518
3519 let nodes = flow
3520 .with_process(&node, deployment.Localhost())
3521 .with_external(&external, deployment.Localhost())
3522 .deploy(&mut deployment);
3523
3524 deployment.deploy().await.unwrap();
3525
3526 let mut external_in = nodes.connect(input_port).await;
3527 let mut external_out = nodes.connect(out).await;
3528
3529 deployment.start().await.unwrap();
3530
3531 external_in.send(1).await.unwrap();
3532 assert_eq!(external_out.next().await.unwrap(), (1, 1));
3533
3534 external_in.send(2).await.unwrap();
3535 assert_eq!(external_out.next().await.unwrap(), (2, 1));
3536 }
3537
3538 #[cfg(feature = "deploy")]
3539 #[tokio::test]
3540 async fn atomic_fold_replays_each_tick() {
3541 let mut deployment = Deployment::new();
3542
3543 let mut flow = FlowBuilder::new();
3544 let node = flow.process::<()>();
3545 let external = flow.external::<()>();
3546
3547 let (input_port, input) =
3548 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3549 let tick = node.tick();
3550
3551 let out = input
3552 .batch(&tick, nondet!())
3553 .cross_singleton(
3554 node.source_iter(q!(vec![1, 2, 3]))
3555 .atomic()
3556 .fold(q!(|| 0), q!(|acc, v| *acc += v))
3557 .snapshot_atomic(&tick, nondet!()),
3558 )
3559 .all_ticks()
3560 .send_bincode_external(&external);
3561
3562 let nodes = flow
3563 .with_process(&node, deployment.Localhost())
3564 .with_external(&external, deployment.Localhost())
3565 .deploy(&mut deployment);
3566
3567 deployment.deploy().await.unwrap();
3568
3569 let mut external_in = nodes.connect(input_port).await;
3570 let mut external_out = nodes.connect(out).await;
3571
3572 deployment.start().await.unwrap();
3573
3574 external_in.send(1).await.unwrap();
3575 assert_eq!(external_out.next().await.unwrap(), (1, 6));
3576
3577 external_in.send(2).await.unwrap();
3578 assert_eq!(external_out.next().await.unwrap(), (2, 6));
3579 }
3580
3581 #[cfg(feature = "deploy")]
3582 #[tokio::test]
3583 async fn unbounded_scan_remembers_state() {
3584 let mut deployment = Deployment::new();
3585
3586 let mut flow = FlowBuilder::new();
3587 let node = flow.process::<()>();
3588 let external = flow.external::<()>();
3589
3590 let (input_port, input) = node.source_external_bincode(&external);
3591 let out = input
3592 .scan(
3593 q!(|| 0),
3594 q!(|acc, v| {
3595 *acc += v;
3596 Some(*acc)
3597 }),
3598 )
3599 .send_bincode_external(&external);
3600
3601 let nodes = flow
3602 .with_process(&node, deployment.Localhost())
3603 .with_external(&external, deployment.Localhost())
3604 .deploy(&mut deployment);
3605
3606 deployment.deploy().await.unwrap();
3607
3608 let mut external_in = nodes.connect(input_port).await;
3609 let mut external_out = nodes.connect(out).await;
3610
3611 deployment.start().await.unwrap();
3612
3613 external_in.send(1).await.unwrap();
3614 assert_eq!(external_out.next().await.unwrap(), 1);
3615
3616 external_in.send(2).await.unwrap();
3617 assert_eq!(external_out.next().await.unwrap(), 3);
3618 }
3619
3620 #[cfg(feature = "deploy")]
3621 #[tokio::test]
3622 async fn unbounded_enumerate_remembers_state() {
3623 let mut deployment = Deployment::new();
3624
3625 let mut flow = FlowBuilder::new();
3626 let node = flow.process::<()>();
3627 let external = flow.external::<()>();
3628
3629 let (input_port, input) = node.source_external_bincode(&external);
3630 let out = input.enumerate().send_bincode_external(&external);
3631
3632 let nodes = flow
3633 .with_process(&node, deployment.Localhost())
3634 .with_external(&external, deployment.Localhost())
3635 .deploy(&mut deployment);
3636
3637 deployment.deploy().await.unwrap();
3638
3639 let mut external_in = nodes.connect(input_port).await;
3640 let mut external_out = nodes.connect(out).await;
3641
3642 deployment.start().await.unwrap();
3643
3644 external_in.send(1).await.unwrap();
3645 assert_eq!(external_out.next().await.unwrap(), (0, 1));
3646
3647 external_in.send(2).await.unwrap();
3648 assert_eq!(external_out.next().await.unwrap(), (1, 2));
3649 }
3650
3651 #[cfg(feature = "deploy")]
3652 #[tokio::test]
3653 async fn unbounded_unique_remembers_state() {
3654 let mut deployment = Deployment::new();
3655
3656 let mut flow = FlowBuilder::new();
3657 let node = flow.process::<()>();
3658 let external = flow.external::<()>();
3659
3660 let (input_port, input) =
3661 node.source_external_bincode::<_, _, TotalOrder, ExactlyOnce>(&external);
3662 let out = input.unique().send_bincode_external(&external);
3663
3664 let nodes = flow
3665 .with_process(&node, deployment.Localhost())
3666 .with_external(&external, deployment.Localhost())
3667 .deploy(&mut deployment);
3668
3669 deployment.deploy().await.unwrap();
3670
3671 let mut external_in = nodes.connect(input_port).await;
3672 let mut external_out = nodes.connect(out).await;
3673
3674 deployment.start().await.unwrap();
3675
3676 external_in.send(1).await.unwrap();
3677 assert_eq!(external_out.next().await.unwrap(), 1);
3678
3679 external_in.send(2).await.unwrap();
3680 assert_eq!(external_out.next().await.unwrap(), 2);
3681
3682 external_in.send(1).await.unwrap();
3683 external_in.send(3).await.unwrap();
3684 assert_eq!(external_out.next().await.unwrap(), 3);
3685 }
3686
3687 #[cfg(feature = "sim")]
3688 #[test]
3689 #[should_panic]
3690 fn sim_batch_nondet_size() {
3691 let mut flow = FlowBuilder::new();
3692 let node = flow.process::<()>();
3693
3694 let (in_send, input) = node.sim_input::<_, TotalOrder, _>();
3695
3696 let tick = node.tick();
3697 let out_recv = input
3698 .batch(&tick, nondet!())
3699 .count()
3700 .all_ticks()
3701 .sim_output();
3702
3703 flow.sim().exhaustive(async || {
3704 in_send.send(());
3705 in_send.send(());
3706 in_send.send(());
3707
3708 assert_eq!(out_recv.next().await.unwrap(), 3); });
3710 }
3711
3712 #[cfg(feature = "sim")]
3713 #[test]
3714 fn sim_batch_preserves_order() {
3715 let mut flow = FlowBuilder::new();
3716 let node = flow.process::<()>();
3717
3718 let (in_send, input) = node.sim_input();
3719
3720 let tick = node.tick();
3721 let out_recv = input
3722 .batch(&tick, nondet!())
3723 .all_ticks()
3724 .sim_output();
3725
3726 flow.sim().exhaustive(async || {
3727 in_send.send(1);
3728 in_send.send(2);
3729 in_send.send(3);
3730
3731 out_recv.assert_yields_only([1, 2, 3]).await;
3732 });
3733 }
3734
3735 #[cfg(feature = "sim")]
3736 #[test]
3737 #[should_panic]
3738 fn sim_batch_unordered_shuffles() {
3739 let mut flow = FlowBuilder::new();
3740 let node = flow.process::<()>();
3741
3742 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3743
3744 let tick = node.tick();
3745 let batch = input.batch(&tick, nondet!());
3746 let out_recv = batch
3747 .clone()
3748 .min()
3749 .zip(batch.max())
3750 .all_ticks()
3751 .sim_output();
3752
3753 flow.sim().exhaustive(async || {
3754 in_send.send_many_unordered([1, 2, 3]);
3755
3756 if out_recv.collect::<Vec<_>>().await == vec![(1, 3), (2, 2)] {
3757 panic!("saw both (1, 3) and (2, 2), so batching must have shuffled the order");
3758 }
3759 });
3760 }
3761
3762 #[cfg(feature = "sim")]
3763 #[test]
3764 fn sim_batch_unordered_shuffles_count() {
3765 let mut flow = FlowBuilder::new();
3766 let node = flow.process::<()>();
3767
3768 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3769
3770 let tick = node.tick();
3771 let batch = input.batch(&tick, nondet!());
3772 let out_recv = batch.all_ticks().sim_output();
3773
3774 let instance_count = flow.sim().exhaustive(async || {
3775 in_send.send_many_unordered([1, 2, 3, 4]);
3776 out_recv.assert_yields_only_unordered([1, 2, 3, 4]).await;
3777 });
3778
3779 assert_eq!(
3780 instance_count,
3781 75 )
3783 }
3784
3785 #[cfg(feature = "sim")]
3786 #[test]
3787 #[should_panic]
3788 fn sim_observe_order_batched() {
3789 let mut flow = FlowBuilder::new();
3790 let node = flow.process::<()>();
3791
3792 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3793
3794 let tick = node.tick();
3795 let batch = input.batch(&tick, nondet!());
3796 let out_recv = batch
3797 .assume_ordering::<TotalOrder>(nondet!())
3798 .all_ticks()
3799 .sim_output();
3800
3801 flow.sim().exhaustive(async || {
3802 in_send.send_many_unordered([1, 2, 3, 4]);
3803 out_recv.assert_yields_only([1, 2, 3, 4]).await; });
3805 }
3806
3807 #[cfg(feature = "sim")]
3808 #[test]
3809 fn sim_observe_order_batched_count() {
3810 let mut flow = FlowBuilder::new();
3811 let node = flow.process::<()>();
3812
3813 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3814
3815 let tick = node.tick();
3816 let batch = input.batch(&tick, nondet!());
3817 let out_recv = batch
3818 .assume_ordering::<TotalOrder>(nondet!())
3819 .all_ticks()
3820 .sim_output();
3821
3822 let instance_count = flow.sim().exhaustive(async || {
3823 in_send.send_many_unordered([1, 2, 3, 4]);
3824 let _ = out_recv.collect::<Vec<_>>().await;
3825 });
3826
3827 assert_eq!(
3828 instance_count,
3829 192 )
3831 }
3832
3833 #[cfg(feature = "sim")]
3834 #[test]
3835 fn sim_unordered_count_instance_count() {
3836 let mut flow = FlowBuilder::new();
3837 let node = flow.process::<()>();
3838
3839 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3840
3841 let tick = node.tick();
3842 let out_recv = input
3843 .count()
3844 .snapshot(&tick, nondet!())
3845 .all_ticks()
3846 .sim_output();
3847
3848 let instance_count = flow.sim().exhaustive(async || {
3849 in_send.send_many_unordered([1, 2, 3, 4]);
3850 assert!(out_recv.collect::<Vec<_>>().await.last().unwrap() == &4);
3851 });
3852
3853 assert_eq!(
3854 instance_count,
3855 16 )
3857 }
3858
3859 #[cfg(feature = "sim")]
3860 #[test]
3861 fn sim_top_level_assume_ordering() {
3862 let mut flow = FlowBuilder::new();
3863 let node = flow.process::<()>();
3864
3865 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3866
3867 let out_recv = input
3868 .assume_ordering::<TotalOrder>(nondet!())
3869 .sim_output();
3870
3871 let instance_count = flow.sim().exhaustive(async || {
3872 in_send.send_many_unordered([1, 2, 3]);
3873 let mut out = out_recv.collect::<Vec<_>>().await;
3874 out.sort();
3875 assert_eq!(out, vec![1, 2, 3]);
3876 });
3877
3878 assert_eq!(instance_count, 6)
3879 }
3880
3881 #[cfg(feature = "sim")]
3882 #[test]
3883 fn sim_top_level_assume_ordering_cycle_back() {
3884 let mut flow = FlowBuilder::new();
3885 let node = flow.process::<()>();
3886 let node2 = flow.process::<()>();
3887
3888 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3889
3890 let (complete_cycle_back, cycle_back) =
3891 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3892 let ordered = input
3893 .merge_unordered(cycle_back)
3894 .assume_ordering::<TotalOrder>(nondet!());
3895 complete_cycle_back.complete(
3896 ordered
3897 .clone()
3898 .map(q!(|v| v + 1))
3899 .filter(q!(|v| v % 2 == 1))
3900 .send(&node2, TCP.fail_stop().bincode())
3901 .send(&node, TCP.fail_stop().bincode()),
3902 );
3903
3904 let out_recv = ordered.sim_output();
3905
3906 let mut saw = false;
3907 let instance_count = flow.sim().exhaustive(async || {
3908 in_send.send_many_unordered([0, 2]);
3909 let out = out_recv.collect::<Vec<_>>().await;
3910
3911 if out.starts_with(&[0, 1, 2]) {
3912 saw = true;
3913 }
3914 });
3915
3916 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3917 assert_eq!(instance_count, 6);
3918 }
3919
3920 #[cfg(feature = "sim")]
3921 #[test]
3922 fn sim_top_level_assume_ordering_cycle_back_tick() {
3923 let mut flow = FlowBuilder::new();
3924 let node = flow.process::<()>();
3925 let node2 = flow.process::<()>();
3926
3927 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3928
3929 let (complete_cycle_back, cycle_back) =
3930 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3931 let ordered = input
3932 .merge_unordered(cycle_back)
3933 .assume_ordering::<TotalOrder>(nondet!());
3934 complete_cycle_back.complete(
3935 ordered
3936 .clone()
3937 .batch(&node.tick(), nondet!())
3938 .all_ticks()
3939 .map(q!(|v| v + 1))
3940 .filter(q!(|v| v % 2 == 1))
3941 .send(&node2, TCP.fail_stop().bincode())
3942 .send(&node, TCP.fail_stop().bincode()),
3943 );
3944
3945 let out_recv = ordered.sim_output();
3946
3947 let mut saw = false;
3948 let instance_count = flow.sim().exhaustive(async || {
3949 in_send.send_many_unordered([0, 2]);
3950 let out = out_recv.collect::<Vec<_>>().await;
3951
3952 if out.starts_with(&[0, 1, 2]) {
3953 saw = true;
3954 }
3955 });
3956
3957 assert!(saw, "did not see an instance with 0, 1, 2 in order");
3958 assert_eq!(instance_count, 58);
3959 }
3960
3961 #[cfg(feature = "sim")]
3962 #[test]
3963 fn sim_top_level_assume_ordering_multiple() {
3964 let mut flow = FlowBuilder::new();
3965 let node = flow.process::<()>();
3966 let node2 = flow.process::<()>();
3967
3968 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
3969 let (_, input2) = node.sim_input::<_, NoOrder, _>();
3970
3971 let (complete_cycle_back, cycle_back) =
3972 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
3973 let input1_ordered = input
3974 .clone()
3975 .merge_unordered(cycle_back)
3976 .assume_ordering::<TotalOrder>(nondet!());
3977 let foo = input1_ordered
3978 .clone()
3979 .map(q!(|v| v + 3))
3980 .weaken_ordering::<NoOrder>()
3981 .merge_unordered(input2)
3982 .assume_ordering::<TotalOrder>(nondet!());
3983
3984 complete_cycle_back.complete(
3985 foo.filter(q!(|v| *v == 3))
3986 .send(&node2, TCP.fail_stop().bincode())
3987 .send(&node, TCP.fail_stop().bincode()),
3988 );
3989
3990 let out_recv = input1_ordered.sim_output();
3991
3992 let mut saw = false;
3993 let instance_count = flow.sim().exhaustive(async || {
3994 in_send.send_many_unordered([0, 1]);
3995 let out = out_recv.collect::<Vec<_>>().await;
3996
3997 if out.starts_with(&[0, 3, 1]) {
3998 saw = true;
3999 }
4000 });
4001
4002 assert!(saw, "did not see an instance with 0, 3, 1 in order");
4003 assert_eq!(instance_count, 24);
4004 }
4005
4006 #[cfg(feature = "sim")]
4007 #[test]
4008 fn sim_atomic_assume_ordering_cycle_back() {
4009 let mut flow = FlowBuilder::new();
4010 let node = flow.process::<()>();
4011 let node2 = flow.process::<()>();
4012
4013 let (in_send, input) = node.sim_input::<_, NoOrder, _>();
4014
4015 let (complete_cycle_back, cycle_back) =
4016 node.forward_ref::<super::Stream<_, _, _, NoOrder>>();
4017 let ordered = input
4018 .merge_unordered(cycle_back)
4019 .atomic()
4020 .assume_ordering::<TotalOrder>(nondet!())
4021 .end_atomic();
4022 complete_cycle_back.complete(
4023 ordered
4024 .clone()
4025 .map(q!(|v| v + 1))
4026 .filter(q!(|v| v % 2 == 1))
4027 .send(&node2, TCP.fail_stop().bincode())
4028 .send(&node, TCP.fail_stop().bincode()),
4029 );
4030
4031 let out_recv = ordered.sim_output();
4032
4033 let instance_count = flow.sim().exhaustive(async || {
4034 in_send.send_many_unordered([0, 2]);
4035 let out = out_recv.collect::<Vec<_>>().await;
4036 assert_eq!(out.len(), 4);
4037 });
4038 assert_eq!(instance_count, 22);
4039 }
4040
4041 #[cfg(feature = "deploy")]
4042 #[tokio::test]
4043 async fn partition_evens_odds() {
4044 let mut deployment = Deployment::new();
4045
4046 let mut flow = FlowBuilder::new();
4047 let node = flow.process::<()>();
4048 let external = flow.external::<()>();
4049
4050 let numbers = node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6]));
4051 let (evens, odds) = numbers.partition(q!(|x: &i32| x % 2 == 0));
4052 let evens_port = evens.send_bincode_external(&external);
4053 let odds_port = odds.send_bincode_external(&external);
4054
4055 let nodes = flow
4056 .with_process(&node, deployment.Localhost())
4057 .with_external(&external, deployment.Localhost())
4058 .deploy(&mut deployment);
4059
4060 deployment.deploy().await.unwrap();
4061
4062 let mut evens_out = nodes.connect(evens_port).await;
4063 let mut odds_out = nodes.connect(odds_port).await;
4064
4065 deployment.start().await.unwrap();
4066
4067 let mut even_results = Vec::new();
4068 for _ in 0..3 {
4069 even_results.push(evens_out.next().await.unwrap());
4070 }
4071 even_results.sort();
4072 assert_eq!(even_results, vec![2, 4, 6]);
4073
4074 let mut odd_results = Vec::new();
4075 for _ in 0..3 {
4076 odd_results.push(odds_out.next().await.unwrap());
4077 }
4078 odd_results.sort();
4079 assert_eq!(odd_results, vec![1, 3, 5]);
4080 }
4081
4082 #[cfg(feature = "deploy")]
4083 #[tokio::test]
4084 async fn unconsumed_inspect_still_runs() {
4085 use crate::deploy::DeployCrateWrapper;
4086
4087 let mut deployment = Deployment::new();
4088
4089 let mut flow = FlowBuilder::new();
4090 let node = flow.process::<()>();
4091
4092 node.source_iter(q!(0..5))
4095 .inspect(q!(|x| println!("inspect: {}", x)));
4096
4097 let nodes = flow
4098 .with_process(&node, deployment.Localhost())
4099 .deploy(&mut deployment);
4100
4101 deployment.deploy().await.unwrap();
4102
4103 let mut stdout = nodes.get_process(&node).stdout();
4104
4105 deployment.start().await.unwrap();
4106
4107 let mut lines = Vec::new();
4108 for _ in 0..5 {
4109 lines.push(stdout.recv().await.unwrap());
4110 }
4111 lines.sort();
4112 assert_eq!(
4113 lines,
4114 vec![
4115 "inspect: 0",
4116 "inspect: 1",
4117 "inspect: 2",
4118 "inspect: 3",
4119 "inspect: 4",
4120 ]
4121 );
4122 }
4123
4124 #[cfg(feature = "sim")]
4125 #[test]
4126 fn sim_limit() {
4127 let mut flow = FlowBuilder::new();
4128 let node = flow.process::<()>();
4129
4130 let (in_send, input) = node.sim_input();
4131
4132 let out_recv = input.limit(q!(3)).sim_output();
4133
4134 flow.sim().exhaustive(async || {
4135 in_send.send(1);
4136 in_send.send(2);
4137 in_send.send(3);
4138 in_send.send(4);
4139 in_send.send(5);
4140
4141 out_recv.assert_yields_only([1, 2, 3]).await;
4142 });
4143 }
4144
4145 #[cfg(feature = "sim")]
4146 #[test]
4147 fn sim_limit_zero() {
4148 let mut flow = FlowBuilder::new();
4149 let node = flow.process::<()>();
4150
4151 let (in_send, input) = node.sim_input();
4152
4153 let out_recv = input.limit(q!(0)).sim_output();
4154
4155 flow.sim().exhaustive(async || {
4156 in_send.send(1);
4157 in_send.send(2);
4158
4159 out_recv.assert_yields_only::<i32, _>([]).await;
4160 });
4161 }
4162
4163 #[cfg(feature = "sim")]
4164 #[test]
4165 fn sim_merge_ordered() {
4166 let mut flow = FlowBuilder::new();
4167 let node = flow.process::<()>();
4168
4169 let (in_send, input) = node.sim_input();
4170 let (in_send2, input2) = node.sim_input();
4171
4172 let out_recv = input
4173 .merge_ordered(input2, nondet!())
4174 .sim_output();
4175
4176 let mut saw_out_of_order = false;
4177 let instances = flow.sim().exhaustive(async || {
4178 in_send.send(1);
4179 in_send.send(2);
4180 in_send2.send(3);
4181 in_send2.send(4);
4182
4183 let out = out_recv.collect::<Vec<_>>().await;
4184
4185 if out == [1, 3, 2, 4] {
4186 saw_out_of_order = true;
4187 }
4188
4189 let mut first_elements = out.iter().filter(|v| **v <= 2).copied().collect::<Vec<_>>();
4192 let mut second_elements = out.iter().filter(|v| **v > 2).copied().collect::<Vec<_>>();
4193 assert_eq!(
4194 first_elements,
4195 vec![1, 2],
4196 "first input order violated: {:?}",
4197 out
4198 );
4199 assert_eq!(
4200 second_elements,
4201 vec![3, 4],
4202 "second input order violated: {:?}",
4203 out
4204 );
4205
4206 first_elements.append(&mut second_elements);
4207 first_elements.sort();
4208 assert_eq!(first_elements, vec![1, 2, 3, 4]);
4209 });
4210
4211 assert!(saw_out_of_order);
4212 assert_eq!(instances, 6);
4213 }
4214
4215 #[cfg(feature = "sim")]
4218 #[test]
4219 fn sim_merge_ordered_one_empty() {
4220 let mut flow = FlowBuilder::new();
4221 let node = flow.process::<()>();
4222
4223 let (in_send, input) = node.sim_input();
4224 let (_in_send2, input2) = node.sim_input();
4225
4226 let out_recv = input
4227 .merge_ordered(input2, nondet!())
4228 .sim_output();
4229
4230 let instances = flow.sim().exhaustive(async || {
4231 in_send.send(1);
4232 in_send.send(2);
4233
4234 let out = out_recv.collect::<Vec<_>>().await;
4235 assert_eq!(out, vec![1, 2]);
4236 });
4237
4238 assert_eq!(instances, 1);
4240 }
4241
4242 #[cfg(feature = "sim")]
4248 #[test]
4249 fn sim_merge_ordered_cycle_back() {
4250 let mut flow = FlowBuilder::new();
4251 let node = flow.process::<()>();
4252
4253 let (in_send, input) = node.sim_input();
4254
4255 let (complete_cycle_back, cycle_back) =
4257 node.forward_ref::<super::Stream<_, _, _, TotalOrder>>();
4258
4259 let merged = input.merge_ordered(cycle_back, nondet!());
4261
4262 complete_cycle_back.complete(merged.clone().filter(q!(|v| *v == 1)).map(q!(|v| v * 10)));
4264
4265 let out_recv = merged.sim_output();
4266
4267 let mut saw_cycle_before_second = false;
4270 flow.sim().exhaustive(async || {
4271 in_send.send(1);
4272 in_send.send(2);
4273
4274 let out = out_recv.collect::<Vec<_>>().await;
4275
4276 let pos_1 = out.iter().position(|v| *v == 1).unwrap();
4278 let pos_10 = out.iter().position(|v| *v == 10).unwrap();
4279 assert!(pos_1 < pos_10, "causal order violated: {:?}", out);
4280
4281 if out == [1, 10, 2] {
4283 saw_cycle_before_second = true;
4284 }
4285
4286 let mut sorted = out;
4287 sorted.sort();
4288 assert_eq!(sorted, vec![1, 2, 10]);
4289 });
4290
4291 assert!(
4292 saw_cycle_before_second,
4293 "never saw the cycled element arrive before the second input element"
4294 );
4295 }
4296
4297 #[cfg(feature = "sim")]
4301 #[test]
4302 fn sim_merge_ordered_delayed() {
4303 let mut flow = FlowBuilder::new();
4304 let node = flow.process::<()>();
4305
4306 let (in_send, input) = node.sim_input();
4307 let (in_send2, input2) = node.sim_input();
4308
4309 let out_recv = input
4310 .merge_ordered(input2, nondet!())
4311 .sim_output();
4312
4313 let mut saw_delayed_interleaving = false;
4314 flow.sim().exhaustive(async || {
4315 in_send.send(1);
4317 in_send2.send(3);
4318 in_send2.send(4);
4319
4320 let first_batch = out_recv.collect::<Vec<_>>().await;
4322
4323 in_send.send(2);
4325 let second_batch = out_recv.collect::<Vec<_>>().await;
4326
4327 let mut all: Vec<_> = first_batch
4328 .iter()
4329 .chain(second_batch.iter())
4330 .copied()
4331 .collect();
4332
4333 if all == [1, 3, 4, 2] {
4335 saw_delayed_interleaving = true;
4336 }
4337
4338 all.sort();
4339 assert_eq!(all, vec![1, 2, 3, 4]);
4340 });
4341
4342 assert!(saw_delayed_interleaving);
4343 }
4344
4345 #[cfg(feature = "deploy")]
4350 #[tokio::test]
4351 async fn deploy_merge_ordered_delayed() {
4352 let mut deployment = Deployment::new();
4353
4354 let mut flow = FlowBuilder::new();
4355 let node = flow.process::<()>();
4356 let external = flow.external::<()>();
4357
4358 let (input_a_port, input_a) = node.source_external_bincode(&external);
4359 let (input_b_port, input_b) = node.source_external_bincode(&external);
4360
4361 let out = input_a
4362 .assume_ordering(nondet!())
4363 .merge_ordered(
4364 input_b.assume_ordering(nondet!()),
4365 nondet!(),
4366 )
4367 .send_bincode_external(&external);
4368
4369 let nodes = flow
4370 .with_process(&node, deployment.Localhost())
4371 .with_external(&external, deployment.Localhost())
4372 .deploy(&mut deployment);
4373
4374 deployment.deploy().await.unwrap();
4375
4376 let mut ext_a = nodes.connect(input_a_port).await;
4377 let mut ext_b = nodes.connect(input_b_port).await;
4378 let mut ext_out = nodes.connect(out).await;
4379
4380 deployment.start().await.unwrap();
4381
4382 ext_a.send(1).await.unwrap();
4384 ext_b.send(3).await.unwrap();
4385 ext_b.send(4).await.unwrap();
4386
4387 let mut received = Vec::new();
4389 for _ in 0..3 {
4390 received.push(ext_out.next().await.unwrap());
4391 }
4392
4393 ext_a.send(2).await.unwrap();
4395 received.push(ext_out.next().await.unwrap());
4396
4397 received.sort();
4399 assert_eq!(received, vec![1, 2, 3, 4]);
4400 }
4401
4402 #[cfg(feature = "deploy")]
4403 #[tokio::test]
4404 async fn monotone_fold_threshold() {
4405 use crate::properties::manual_proof;
4406
4407 let mut deployment = Deployment::new();
4408
4409 let mut flow = FlowBuilder::new();
4410 let node = flow.process::<()>();
4411 let external = flow.external::<()>();
4412
4413 let in_unbounded: super::Stream<_, _> =
4414 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4415 let sum = in_unbounded.fold(
4416 q!(|| 0),
4417 q!(
4418 |sum, v| {
4419 *sum += v;
4420 },
4421 monotone = manual_proof!()
4422 ),
4423 );
4424
4425 let threshold_out = sum
4426 .threshold_greater_or_equal(node.singleton(q!(7)))
4427 .send_bincode_external(&external);
4428
4429 let nodes = flow
4430 .with_process(&node, deployment.Localhost())
4431 .with_external(&external, deployment.Localhost())
4432 .deploy(&mut deployment);
4433
4434 deployment.deploy().await.unwrap();
4435
4436 let mut threshold_out = nodes.connect(threshold_out).await;
4437
4438 deployment.start().await.unwrap();
4439
4440 assert_eq!(threshold_out.next().await.unwrap(), 7);
4441 }
4442
4443 #[cfg(feature = "deploy")]
4444 #[tokio::test]
4445 async fn monotone_count_threshold() {
4446 let mut deployment = Deployment::new();
4447
4448 let mut flow = FlowBuilder::new();
4449 let node = flow.process::<()>();
4450 let external = flow.external::<()>();
4451
4452 let in_unbounded: super::Stream<_, _> =
4453 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4454 let sum = in_unbounded.count();
4455
4456 let threshold_out = sum
4457 .threshold_greater_or_equal(node.singleton(q!(3)))
4458 .send_bincode_external(&external);
4459
4460 let nodes = flow
4461 .with_process(&node, deployment.Localhost())
4462 .with_external(&external, deployment.Localhost())
4463 .deploy(&mut deployment);
4464
4465 deployment.deploy().await.unwrap();
4466
4467 let mut threshold_out = nodes.connect(threshold_out).await;
4468
4469 deployment.start().await.unwrap();
4470
4471 assert_eq!(threshold_out.next().await.unwrap(), 3);
4472 }
4473
4474 #[cfg(feature = "deploy")]
4475 #[tokio::test]
4476 async fn monotone_map_order_preserving_threshold() {
4477 use crate::properties::manual_proof;
4478
4479 let mut deployment = Deployment::new();
4480
4481 let mut flow = FlowBuilder::new();
4482 let node = flow.process::<()>();
4483 let external = flow.external::<()>();
4484
4485 let in_unbounded: super::Stream<_, _> =
4486 node.source_iter(q!(vec![1i32, 2, 3, 4, 5, 6])).into();
4487 let sum = in_unbounded.fold(
4488 q!(|| 0),
4489 q!(
4490 |sum, v| {
4491 *sum += v;
4492 },
4493 monotone = manual_proof!()
4494 ),
4495 );
4496
4497 let doubled = sum.map(q!(
4499 |v| v * 2,
4500 order_preserving = manual_proof!()
4501 ));
4502
4503 let threshold_out = doubled
4504 .threshold_greater_or_equal(node.singleton(q!(14)))
4505 .send_bincode_external(&external);
4506
4507 let nodes = flow
4508 .with_process(&node, deployment.Localhost())
4509 .with_external(&external, deployment.Localhost())
4510 .deploy(&mut deployment);
4511
4512 deployment.deploy().await.unwrap();
4513
4514 let mut threshold_out = nodes.connect(threshold_out).await;
4515
4516 deployment.start().await.unwrap();
4517
4518 assert_eq!(threshold_out.next().await.unwrap(), 14);
4519 }
4520
4521 #[cfg(any(feature = "deploy", feature = "sim"))]
4524 mod join_ordering_type_tests {
4525 use crate::live_collections::boundedness::{Bounded, Unbounded};
4526 use crate::live_collections::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
4527 use crate::location::{Location, Process};
4528
4529 #[expect(dead_code, reason = "compile-time type test")]
4530 fn join_unbounded_with_bounded_preserves_order<'a>(
4531 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4532 right: Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4533 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4534 left.join(right)
4535 }
4536
4537 #[expect(dead_code, reason = "compile-time type test")]
4538 fn join_unbounded_with_unbounded_is_no_order<'a>(
4539 left: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4540 right: Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4541 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4542 left.join(right)
4543 }
4544
4545 #[expect(dead_code, reason = "compile-time type test")]
4546 fn join_bounded_with_bounded_preserves_order<'a, L: Location<'a>>(
4547 left: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4548 right: Stream<(i32, char), L, Bounded, TotalOrder, ExactlyOnce>,
4549 ) -> Stream<(i32, (char, char)), L, Bounded, TotalOrder, ExactlyOnce> {
4550 left.join(right)
4551 }
4552
4553 #[expect(dead_code, reason = "compile-time type test")]
4554 fn join_unbounded_noorder_with_bounded<'a>(
4555 left: Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce>,
4556 right: Stream<(i32, char), Process<'a>, Bounded, NoOrder, ExactlyOnce>,
4557 ) -> Stream<(i32, (char, char)), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4558 left.join(right)
4559 }
4560
4561 #[expect(dead_code, reason = "compile-time type test")]
4564 fn cross_product_unbounded_with_bounded_preserves_order<'a>(
4565 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4566 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4567 ) -> Stream<(i32, char), Process<'a>, Unbounded, TotalOrder, ExactlyOnce> {
4568 left.cross_product(right)
4569 }
4570
4571 #[expect(dead_code, reason = "compile-time type test")]
4572 fn cross_product_bounded_with_bounded_preserves_order<'a>(
4573 left: Stream<i32, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4574 right: Stream<char, Process<'a>, Bounded, TotalOrder, ExactlyOnce>,
4575 ) -> Stream<(i32, char), Process<'a>, Bounded, TotalOrder, ExactlyOnce> {
4576 left.cross_product(right)
4577 }
4578
4579 #[expect(dead_code, reason = "compile-time type test")]
4580 fn cross_product_unbounded_with_unbounded_is_no_order<'a>(
4581 left: Stream<i32, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4582 right: Stream<char, Process<'a>, Unbounded, TotalOrder, ExactlyOnce>,
4583 ) -> Stream<(i32, char), Process<'a>, Unbounded, NoOrder, ExactlyOnce> {
4584 left.cross_product(right)
4585 }
4586 } #[cfg(feature = "sim")]
4591 #[test]
4592 fn cross_product_mixed_boundedness_correctness() {
4593 use stageleft::q;
4594
4595 use crate::compile::builder::FlowBuilder;
4596 use crate::nondet::nondet;
4597
4598 let mut flow = FlowBuilder::new();
4599 let process = flow.process::<()>();
4600 let tick = process.tick();
4601
4602 let left = process.source_iter(q!(vec![1, 2]));
4603 let right = process
4604 .source_iter(q!(vec!['a', 'b']))
4605 .batch(&tick, nondet!())
4606 .all_ticks();
4607
4608 let out = left.cross_product(right).sim_output();
4609
4610 flow.sim().exhaustive(async || {
4611 out.assert_yields_only_unordered(vec![(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')])
4612 .await;
4613 });
4614 }
4615
4616 #[cfg(feature = "sim")]
4617 #[test]
4618 fn join_mixed_boundedness_correctness() {
4619 use stageleft::q;
4620
4621 use crate::compile::builder::FlowBuilder;
4622 use crate::nondet::nondet;
4623
4624 let mut flow = FlowBuilder::new();
4625 let process = flow.process::<()>();
4626 let tick = process.tick();
4627
4628 let left = process.source_iter(q!(vec![(1, 'a'), (2, 'b')]));
4629 let right = process
4630 .source_iter(q!(vec![(1, 'x'), (2, 'y')]))
4631 .batch(&tick, nondet!())
4632 .all_ticks();
4633
4634 let out = left.join(right).sim_output();
4635
4636 flow.sim().exhaustive(async || {
4637 out.assert_yields_only_unordered(vec![(1, ('a', 'x')), (2, ('b', 'y'))])
4638 .await;
4639 });
4640 }
4641
4642 #[cfg(feature = "sim")]
4643 #[test]
4644 fn sim_merge_unordered_independent_atomics() {
4645 let mut flow = FlowBuilder::new();
4646 let node = flow.process::<()>();
4647
4648 let (in1_send, input1) = node.sim_input::<_, TotalOrder, _>();
4649 let (in2_send, input2) = node.sim_input::<_, TotalOrder, _>();
4650
4651 let out = input1
4652 .atomic()
4653 .merge_unordered(input2.atomic())
4654 .end_atomic()
4655 .sim_output();
4656
4657 flow.sim().exhaustive(async || {
4658 in1_send.send(1);
4659 in2_send.send(2);
4660
4661 out.assert_yields_only_unordered(vec![1, 2]).await;
4662 });
4663 }
4664
4665 #[cfg(feature = "deploy")]
4666 #[tokio::test]
4667 async fn test_stream_ref() {
4668 let mut deployment = Deployment::new();
4669
4670 let mut flow = FlowBuilder::new();
4671 let external = flow.external::<()>();
4672 let p1 = flow.process::<()>();
4673
4674 let my_stream = p1.source_iter(q!(1..=5i32));
4676
4677 let stream_ref = my_stream.by_ref();
4678
4679 let out_port = p1
4681 .source_iter(q!([()]))
4682 .map(q!(|_| stream_ref.len() as i32))
4683 .send_bincode_external(&external);
4684
4685 my_stream.for_each(q!(|_| {}));
4687
4688 let nodes = flow
4689 .with_default_optimize()
4690 .with_process(&p1, deployment.Localhost())
4691 .with_external(&external, deployment.Localhost())
4692 .deploy(&mut deployment);
4693
4694 deployment.deploy().await.unwrap();
4695
4696 let mut out_recv = nodes.connect(out_port).await;
4697
4698 deployment.start().await.unwrap();
4699
4700 let result = out_recv.next().await.unwrap();
4701 assert_eq!(result, 5);
4703 }
4704
4705 #[cfg(feature = "deploy")]
4706 #[tokio::test]
4707 async fn test_stream_ref_contents() {
4708 let mut deployment = Deployment::new();
4709
4710 let mut flow = FlowBuilder::new();
4711 let external = flow.external::<()>();
4712 let p1 = flow.process::<()>();
4713
4714 let my_stream = p1.source_iter(q!(1..=3i32));
4716
4717 let stream_ref = my_stream.by_ref();
4718
4719 let out_port = p1
4721 .source_iter(q!([()]))
4722 .map(q!(|_| stream_ref.iter().sum::<i32>()))
4723 .send_bincode_external(&external);
4724
4725 my_stream.for_each(q!(|_| {}));
4726
4727 let nodes = flow
4728 .with_default_optimize()
4729 .with_process(&p1, deployment.Localhost())
4730 .with_external(&external, deployment.Localhost())
4731 .deploy(&mut deployment);
4732
4733 deployment.deploy().await.unwrap();
4734
4735 let mut out_recv = nodes.connect(out_port).await;
4736
4737 deployment.start().await.unwrap();
4738
4739 let result = out_recv.next().await.unwrap();
4740 assert_eq!(result, 6);
4742 }
4743
4744 #[cfg(feature = "deploy")]
4745 #[tokio::test]
4746 async fn test_stream_ref_no_consumer() {
4747 let mut deployment = Deployment::new();
4748
4749 let mut flow = FlowBuilder::new();
4750 let external = flow.external::<()>();
4751 let p1 = flow.process::<()>();
4752
4753 let my_stream = p1.source_iter(q!(1..=4i32));
4755
4756 let stream_ref = my_stream.by_ref();
4757
4758 let out_port = p1
4759 .source_iter(q!([()]))
4760 .map(q!(|_| stream_ref.len() as i32))
4761 .send_bincode_external(&external);
4762
4763 let nodes = flow
4764 .with_default_optimize()
4765 .with_process(&p1, deployment.Localhost())
4766 .with_external(&external, deployment.Localhost())
4767 .deploy(&mut deployment);
4768
4769 deployment.deploy().await.unwrap();
4770
4771 let mut out_recv = nodes.connect(out_port).await;
4772
4773 deployment.start().await.unwrap();
4774
4775 let result = out_recv.next().await.unwrap();
4776 assert_eq!(result, 4);
4777 }
4778
4779 #[cfg(feature = "deploy")]
4780 #[tokio::test]
4781 async fn test_stream_mut() {
4782 let mut deployment = Deployment::new();
4783
4784 let mut flow = FlowBuilder::new();
4785 let external = flow.external::<()>();
4786 let p1 = flow.process::<()>();
4787
4788 let my_stream = p1.source_iter(q!(1..=5i32));
4790
4791 let stream_mut = my_stream.by_mut();
4792
4793 let out_port = p1
4795 .source_iter(q!([()]))
4796 .map(q!(|_| {
4797 stream_mut.retain(|x| *x > 3);
4798 stream_mut.len() as i32
4799 }))
4800 .send_bincode_external(&external);
4801
4802 my_stream.for_each(q!(|_| {}));
4803
4804 let nodes = flow
4805 .with_default_optimize()
4806 .with_process(&p1, deployment.Localhost())
4807 .with_external(&external, deployment.Localhost())
4808 .deploy(&mut deployment);
4809
4810 deployment.deploy().await.unwrap();
4811
4812 let mut out_recv = nodes.connect(out_port).await;
4813
4814 deployment.start().await.unwrap();
4815
4816 let result = out_recv.next().await.unwrap();
4817 assert_eq!(result, 2);
4819 }
4820
4821 #[cfg(feature = "sim")]
4825 #[test]
4826 fn sim_map_with_mut_on_unordered_explores_multiple_states() {
4827 use crate::live_collections::sliced::sliced;
4828 use crate::live_collections::stream::ExactlyOnce;
4829 use crate::properties::manual_proof;
4830
4831 let mut flow = FlowBuilder::new();
4832 let node = flow.process::<()>();
4833
4834 let (trigger_send, trigger) = node.sim_input::<i32, TotalOrder, ExactlyOnce>();
4835
4836 let out_recv = sliced! {
4837 let batch = use(trigger, nondet!());
4838 let counter = batch.location().source_iter(q!(vec![0i32]))
4839 .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4840 let counter_mut = counter.by_mut();
4841 let items = batch.location().source_iter(q!(vec![1i32, 2])).weaken_ordering::<NoOrder>();
4842 items.map(q!(
4843 |x| {
4844 *counter_mut += x;
4845 *counter_mut
4846 },
4847 commutative = manual_proof!()
4848 ))
4849 }
4850 .sim_output();
4851
4852 let count = flow.sim().exhaustive(async || {
4853 trigger_send.send(1);
4854 let _all: Vec<i32> = out_recv.collect_sorted().await;
4855 });
4856
4857 assert_eq!(
4858 count, 2,
4859 "Expected 2 simulation instances due to mut on unordered input, got {}",
4860 count
4861 );
4862 }
4863
4864 #[cfg(feature = "sim")]
4868 #[test]
4869 #[ignore = "observe_nondet not yet supported for top-level bounded inputs (https://github.com/hydro-project/hydro/issues/2950)"]
4870 fn sim_map_with_mut_on_unordered_top_level() {
4871 use crate::properties::manual_proof;
4872
4873 let mut flow = FlowBuilder::new();
4874 let node = flow.process::<()>();
4875
4876 let counter = node
4877 .source_iter(q!(vec![0i32]))
4878 .fold(q!(|| 0i32), q!(|acc, v| *acc += v));
4879 let counter_mut = counter.by_mut();
4880
4881 let out_recv = node
4882 .source_iter(q!(vec![1i32, 2]))
4883 .weaken_ordering::<NoOrder>()
4884 .map(q!(
4885 |x| {
4886 *counter_mut += x;
4887 *counter_mut
4888 },
4889 commutative = manual_proof!()
4890 ))
4891 .assume_ordering::<TotalOrder>(nondet!())
4892 .sim_output();
4893
4894 counter.into_stream().for_each(q!(|_| {}));
4895
4896 let count = flow.sim().exhaustive(async || {
4897 let _all: Vec<i32> = out_recv.collect().await;
4898 });
4899
4900 assert_eq!(
4901 count, 2,
4902 "Expected 2 simulation instances due to mut on unordered input, got {}",
4903 count
4904 );
4905 }
4906}