hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(feature = "tokio")]
23use crate::location::TopLevel;
24#[cfg(stageleft_runtime)]
25use crate::location::dynamic::{DynLocation, LocationId};
26use crate::location::tick::Atomic;
27use crate::location::{Location, Tick, check_matching_location};
28use crate::nondet::{NonDet, nondet};
29use crate::properties::{
30 ApplyMonotoneStream, ApplyOrderPreservingSingleton, Proved, SingletonMapFuncAlgebra,
31 StreamMapFuncAlgebra, ValidMutCommutativityFor, ValidMutIdempotenceFor,
32};
33
34/// A marker trait indicating which components of a [`Singleton`] may change.
35///
36/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
37/// includes an additional variant [`Monotonic`], which means that the value will only grow.
38pub trait SingletonBound {
39 /// The [`Boundedness`] that this [`Singleton`] would be erased to.
40 type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
41
42 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
43 type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
44
45 /// Returns the [`SingletonBoundKind`] corresponding to this type.
46 fn bound_kind() -> SingletonBoundKind;
47}
48
49impl SingletonBound for Unbounded {
50 type UnderlyingBound = Unbounded;
51
52 type StreamToMonotone = Monotonic;
53
54 fn bound_kind() -> SingletonBoundKind {
55 SingletonBoundKind::Unbounded
56 }
57}
58
59impl SingletonBound for Bounded {
60 type UnderlyingBound = Bounded;
61
62 type StreamToMonotone = Bounded;
63
64 fn bound_kind() -> SingletonBoundKind {
65 SingletonBoundKind::Bounded
66 }
67}
68
69/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
70pub struct Monotonic;
71
72impl SingletonBound for Monotonic {
73 type UnderlyingBound = Unbounded;
74
75 type StreamToMonotone = Monotonic;
76
77 fn bound_kind() -> SingletonBoundKind {
78 SingletonBoundKind::Monotonic
79 }
80}
81
82#[sealed]
83#[diagnostic::on_unimplemented(
84 message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
85 label = "required here",
86 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
87)]
88/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
89pub trait IsMonotonic: SingletonBound {}
90
91#[sealed]
92#[diagnostic::do_not_recommend]
93impl IsMonotonic for Monotonic {}
94
95#[sealed]
96#[diagnostic::do_not_recommend]
97impl<B: IsBounded> IsMonotonic for B {}
98
99/// A single Rust value that can asynchronously change over time.
100///
101/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
102/// [`Unbounded`], the value will asynchronously change over time.
103///
104/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
105/// a single number that will asynchronously change as events are processed. Singletons also appear
106/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
107/// such as getting the length of a batch of requests.
108///
109/// Type Parameters:
110/// - `Type`: the type of the value in this singleton
111/// - `Loc`: the [`Location`] where the singleton is materialized
112/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
113pub struct Singleton<Type, Loc, Bound: SingletonBound> {
114 pub(crate) location: Loc,
115 pub(crate) ir_node: RefCell<HydroNode>,
116 pub(crate) flow_state: FlowState,
117
118 _phantom: PhantomData<(Type, Loc, Bound)>,
119}
120
121impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
122 fn drop(&mut self) {
123 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
124 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
125 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
126 input: Box::new(ir_node),
127 op_metadata: HydroIrOpMetadata::new(),
128 });
129 }
130 }
131}
132
133impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
134where
135 T: Clone,
136 L: Location<'a>,
137{
138 fn from(value: Singleton<T, L, Bounded>) -> Self {
139 let location = value.location().clone();
140 Singleton::new(
141 location.clone(),
142 HydroNode::UnboundSingleton {
143 inner: Box::new(value.ir_node.replace(HydroNode::Placeholder)),
144 metadata: location
145 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
146 },
147 )
148 }
149}
150
151impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
152where
153 L: Location<'a>,
154{
155 type Location = Tick<L>;
156
157 fn location(&self) -> &Self::Location {
158 self.location()
159 }
160
161 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
162 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
163 location.clone(),
164 HydroNode::DeferTick {
165 input: Box::new(HydroNode::CycleSource {
166 cycle_id,
167 metadata: location.new_node_metadata(Self::collection_kind()),
168 }),
169 metadata: location
170 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
171 },
172 );
173
174 from_previous_tick.unwrap_or(initial)
175 }
176}
177
178impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
179where
180 L: Location<'a>,
181{
182 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
183 assert_eq!(
184 Location::id(&self.location),
185 expected_location,
186 "locations do not match"
187 );
188 self.location
189 .flow_state()
190 .borrow_mut()
191 .push_root(HydroRoot::CycleSink {
192 cycle_id,
193 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
194 op_metadata: HydroIrOpMetadata::new(),
195 });
196 }
197}
198
199impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
200where
201 L: Location<'a>,
202{
203 type Location = L;
204
205 fn create_source(cycle_id: CycleId, location: L) -> Self {
206 Singleton::new(
207 location.clone(),
208 HydroNode::CycleSource {
209 cycle_id,
210 metadata: location.new_node_metadata(Self::collection_kind()),
211 },
212 )
213 }
214}
215
216impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
217where
218 L: Location<'a>,
219{
220 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
221 assert_eq!(
222 Location::id(&self.location),
223 expected_location,
224 "locations do not match"
225 );
226 self.location
227 .flow_state()
228 .borrow_mut()
229 .push_root(HydroRoot::CycleSink {
230 cycle_id,
231 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
232 op_metadata: HydroIrOpMetadata::new(),
233 });
234 }
235}
236
237impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
238where
239 T: Clone,
240 L: Location<'a>,
241{
242 fn clone(&self) -> Self {
243 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
244 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
245 *self.ir_node.borrow_mut() = HydroNode::Tee {
246 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
247 metadata: self.location.new_node_metadata(Self::collection_kind()),
248 };
249 }
250
251 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
252 Singleton {
253 location: self.location.clone(),
254 flow_state: self.flow_state.clone(),
255 ir_node: HydroNode::Tee {
256 inner: SharedNode(inner.0.clone()),
257 metadata: metadata.clone(),
258 }
259 .into(),
260 _phantom: PhantomData,
261 }
262 } else {
263 unreachable!()
264 }
265 }
266}
267
268#[cfg(stageleft_runtime)]
269fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
270 me: Singleton<T, Tick<L>, B>,
271 other: Optional<O, Tick<L>, B::UnderlyingBound>,
272) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
273 let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
274 super::optional::zip_inside_tick(me_as_optional, other)
275}
276
277impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
278where
279 L: Location<'a>,
280{
281 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
282 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
283 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
284 let flow_state = location.flow_state().clone();
285 Singleton {
286 location,
287 flow_state,
288 ir_node: RefCell::new(ir_node),
289 _phantom: PhantomData,
290 }
291 }
292
293 pub(crate) fn collection_kind() -> CollectionKind {
294 CollectionKind::Singleton {
295 bound: B::bound_kind(),
296 element_type: stageleft::quote_type::<T>().into(),
297 }
298 }
299
300 /// Returns the [`Location`] where this singleton is being materialized.
301 pub fn location(&self) -> &L {
302 &self.location
303 }
304
305 /// Creates a lightweight reference handle to this singleton that can be captured
306 /// inside `q!()` closures. The handle resolves to `&T` at runtime.
307 ///
308 /// The singleton must be bounded, otherwise reading it would be non-deterministic.
309 ///
310 /// ```rust
311 /// # #[cfg(feature = "deploy")] {
312 /// # use hydro_lang::prelude::*;
313 /// # use futures::StreamExt;
314 /// # tokio_test::block_on(async {
315 /// # let mut deployment = hydro_deploy::Deployment::new();
316 /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
317 /// # let process = builder.process::<()>();
318 /// # let external = builder.external::<()>();
319 /// let my_count = process
320 /// .source_iter(q!(0..5i32))
321 /// .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
322 /// let count_ref = my_count.by_ref();
323 /// let out_port = process
324 /// .source_iter(q!(1..=3i32))
325 /// .map(q!(|x| x + *count_ref))
326 /// .send_bincode_external(&external);
327 /// # let nodes = builder
328 /// # .with_default_optimize()
329 /// # .with_process(&process, deployment.Localhost())
330 /// # .with_external(&external, deployment.Localhost())
331 /// # .deploy(&mut deployment);
332 /// # deployment.deploy().await.unwrap();
333 /// # let mut out_recv = nodes.connect(out_port).await;
334 /// # deployment.start().await.unwrap();
335 /// # let mut results = Vec::new();
336 /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
337 /// # results.sort();
338 /// // fold(0..5) = 10, so results are 11, 12, 13
339 /// # assert_eq!(results, vec![11, 12, 13]);
340 /// # });
341 /// # }
342 /// ```
343 pub fn by_ref(&self) -> crate::handoff_ref::SingletonRef<'a, '_, T, L>
344 where
345 B: IsBounded,
346 {
347 crate::handoff_ref::SingletonRef::new(&self.ir_node)
348 }
349
350 /// Returns a mutable reference handle to this singleton that can be captured inside `q!()`
351 /// closures. The handle resolves to `&mut T` at runtime.
352 ///
353 /// Mutable references are ordered via access groups in the generated DFIR code, ensuring
354 /// exclusive access at each point in the execution order.
355 ///
356 /// ```rust
357 /// # #[cfg(feature = "deploy")] {
358 /// # use hydro_lang::prelude::*;
359 /// # use futures::StreamExt;
360 /// # tokio_test::block_on(async {
361 /// # let mut deployment = hydro_deploy::Deployment::new();
362 /// # let mut builder = hydro_lang::compile::builder::FlowBuilder::new();
363 /// # let process = builder.process::<()>();
364 /// # let external = builder.external::<()>();
365 /// let my_count = process
366 /// .source_iter(q!(0..5i32))
367 /// .fold(q!(|| 0i32), q!(|acc: &mut i32, x| *acc += x));
368 /// let count_mut = my_count.by_mut();
369 /// let out_port = process
370 /// .source_iter(q!(1..=3i32))
371 /// .map(q!(|x| {
372 /// *count_mut += x;
373 /// *count_mut
374 /// }))
375 /// .send_bincode_external(&external);
376 /// # let nodes = builder
377 /// # .with_default_optimize()
378 /// # .with_process(&process, deployment.Localhost())
379 /// # .with_external(&external, deployment.Localhost())
380 /// # .deploy(&mut deployment);
381 /// # deployment.deploy().await.unwrap();
382 /// # let mut out_recv = nodes.connect(out_port).await;
383 /// # deployment.start().await.unwrap();
384 /// # let mut results = Vec::new();
385 /// # for _ in 0..3 { results.push(out_recv.next().await.unwrap()); }
386 /// # results.sort();
387 /// // fold(0..5) = 10, then each map adds x: results are 11, 13, 16
388 /// # assert_eq!(results, vec![11, 13, 16]);
389 /// # });
390 /// # }
391 /// ```
392 pub fn by_mut(&self) -> crate::handoff_ref::SingletonMut<'a, '_, T, L>
393 where
394 B: IsBounded,
395 {
396 crate::handoff_ref::SingletonMut::new(&self.ir_node)
397 }
398
399 /// Weakens the consistency of this live collection to not guarantee any consistency across
400 /// cluster members (if this collection is on a cluster).
401 pub fn weaken_consistency(self) -> Singleton<T, L::DropConsistency, B>
402 where
403 L: Location<'a>,
404 {
405 if L::consistency()
406 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
407 {
408 // already no consistency
409 Singleton::new(
410 self.location.drop_consistency(),
411 self.ir_node.replace(HydroNode::Placeholder),
412 )
413 } else {
414 Singleton::new(
415 self.location.drop_consistency(),
416 HydroNode::Cast {
417 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
418 metadata:
419 self.location
420 .clone()
421 .drop_consistency()
422 .new_node_metadata(
423 Singleton::<T, L::DropConsistency, B>::collection_kind(),
424 ),
425 },
426 )
427 }
428 }
429
430 /// Casts this live collection to have the consistency guarantees specified in the given
431 /// location type parameter. The developer must ensure that the strengthened consistency
432 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
433 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
434 self,
435 _proof: impl crate::properties::ConsistencyProof,
436 ) -> Singleton<T, L2, B>
437 where
438 L: Location<'a>,
439 {
440 if L::consistency() == L2::consistency() {
441 Singleton::new(
442 self.location.with_consistency_of(),
443 self.ir_node.replace(HydroNode::Placeholder),
444 )
445 } else {
446 Singleton::new(
447 self.location.with_consistency_of(),
448 HydroNode::AssertIsConsistent {
449 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
450 trusted: false,
451 metadata: self
452 .location
453 .clone()
454 .with_consistency_of::<L2>()
455 .new_node_metadata(Singleton::<T, L2, B>::collection_kind()),
456 },
457 )
458 }
459 }
460
461 /// Drops the monotonicity property of the [`Singleton`].
462 pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
463 if B::bound_kind() == B::UnderlyingBound::bound_kind() {
464 Singleton::new(
465 self.location.clone(),
466 self.ir_node.replace(HydroNode::Placeholder),
467 )
468 } else {
469 Singleton::new(
470 self.location.clone(),
471 HydroNode::Cast {
472 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
473 metadata:
474 self.location.new_node_metadata(
475 Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
476 ),
477 },
478 )
479 }
480 }
481
482 /// Transforms the singleton value by applying a function `f` to it,
483 /// continuously as the input is updated.
484 ///
485 /// # Example
486 /// ```rust
487 /// # #[cfg(feature = "deploy")] {
488 /// # use hydro_lang::prelude::*;
489 /// # use futures::StreamExt;
490 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
491 /// let tick = process.tick();
492 /// let singleton = tick.singleton(q!(5));
493 /// singleton.map(q!(|v| v * 2)).all_ticks()
494 /// # }, |mut stream| async move {
495 /// // 10
496 /// # assert_eq!(stream.next().await.unwrap(), 10);
497 /// # }));
498 /// # }
499 /// ```
500 pub fn map<U, F, OP, B2: SingletonBound>(
501 self,
502 f: impl IntoQuotedMut<'a, F, L, SingletonMapFuncAlgebra<OP>>,
503 ) -> Singleton<U, L, B2>
504 where
505 F: Fn(T) -> U + 'a,
506 B: ApplyOrderPreservingSingleton<OP, B2>,
507 {
508 let (f, proof) = f.splice_fn1_ctx_props(&self.location);
509 proof.register_proof(&f);
510 let f = f.into();
511 Singleton::new(
512 self.location.clone(),
513 HydroNode::Map {
514 f,
515 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
516 metadata: self
517 .location
518 .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
519 },
520 )
521 }
522
523 /// Transforms the singleton value by applying a function `f` to it and then flattening
524 /// the result into a stream, preserving the order of elements.
525 ///
526 /// The function `f` is applied to the singleton value to produce an iterator, and all items
527 /// from that iterator are emitted in the output stream in deterministic order.
528 ///
529 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
530 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
531 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
532 ///
533 /// # Example
534 /// ```rust
535 /// # #[cfg(feature = "deploy")] {
536 /// # use hydro_lang::prelude::*;
537 /// # use futures::StreamExt;
538 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
539 /// let tick = process.tick();
540 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
541 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
542 /// # }, |mut stream| async move {
543 /// // 1, 2, 3
544 /// # for w in vec![1, 2, 3] {
545 /// # assert_eq!(stream.next().await.unwrap(), w);
546 /// # }
547 /// # }));
548 /// # }
549 /// ```
550 pub fn flat_map_ordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
551 self,
552 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
553 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
554 where
555 B: IsBounded,
556 I: IntoIterator<Item = U>,
557 F: FnMut(T) -> I + 'a,
558 C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
559 Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
560 {
561 self.into_stream().flat_map_ordered(f)
562 }
563
564 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
565 /// for the output type `I` to produce items in any order.
566 ///
567 /// The function `f` is applied to the singleton value to produce an iterator, and all items
568 /// from that iterator are emitted in the output stream in non-deterministic order.
569 ///
570 /// # Example
571 /// ```rust
572 /// # #[cfg(feature = "deploy")] {
573 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
574 /// # use futures::StreamExt;
575 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
576 /// let tick = process.tick();
577 /// let singleton = tick.singleton(q!(
578 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
579 /// ));
580 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
581 /// # }, |mut stream| async move {
582 /// // 1, 2, 3, but in no particular order
583 /// # let mut results = Vec::new();
584 /// # for _ in 0..3 {
585 /// # results.push(stream.next().await.unwrap());
586 /// # }
587 /// # results.sort();
588 /// # assert_eq!(results, vec![1, 2, 3]);
589 /// # }));
590 /// # }
591 /// ```
592 pub fn flat_map_unordered<U, I, F, C, Idemp, const WAS_MUT: bool>(
593 self,
594 f: impl IntoQuotedMut<'a, F, L, StreamMapFuncAlgebra<C, Idemp>>,
595 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
596 where
597 B: IsBounded,
598 I: IntoIterator<Item = U>,
599 F: FnMut(T) -> I + 'a,
600 C: ValidMutCommutativityFor<F, T, I, TotalOrder, WAS_MUT>,
601 Idemp: ValidMutIdempotenceFor<F, T, I, ExactlyOnce, WAS_MUT>,
602 {
603 self.into_stream().flat_map_unordered(f)
604 }
605
606 /// Flattens the singleton value into a stream, preserving the order of elements.
607 ///
608 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
609 /// are emitted in the output stream in deterministic order.
610 ///
611 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
612 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
613 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
614 ///
615 /// # Example
616 /// ```rust
617 /// # #[cfg(feature = "deploy")] {
618 /// # use hydro_lang::prelude::*;
619 /// # use futures::StreamExt;
620 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
621 /// let tick = process.tick();
622 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
623 /// singleton.flatten_ordered().all_ticks()
624 /// # }, |mut stream| async move {
625 /// // 1, 2, 3
626 /// # for w in vec![1, 2, 3] {
627 /// # assert_eq!(stream.next().await.unwrap(), w);
628 /// # }
629 /// # }));
630 /// # }
631 /// ```
632 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
633 where
634 B: IsBounded,
635 T: IntoIterator<Item = U>,
636 {
637 self.flat_map_ordered(q!(|x| x))
638 }
639
640 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
641 /// for the element type `T` to produce items in any order.
642 ///
643 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
644 /// are emitted in the output stream in non-deterministic order.
645 ///
646 /// # Example
647 /// ```rust
648 /// # #[cfg(feature = "deploy")] {
649 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
650 /// # use futures::StreamExt;
651 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
652 /// let tick = process.tick();
653 /// let singleton = tick.singleton(q!(
654 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
655 /// ));
656 /// singleton.flatten_unordered().all_ticks()
657 /// # }, |mut stream| async move {
658 /// // 1, 2, 3, but in no particular order
659 /// # let mut results = Vec::new();
660 /// # for _ in 0..3 {
661 /// # results.push(stream.next().await.unwrap());
662 /// # }
663 /// # results.sort();
664 /// # assert_eq!(results, vec![1, 2, 3]);
665 /// # }));
666 /// # }
667 /// ```
668 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
669 where
670 B: IsBounded,
671 T: IntoIterator<Item = U>,
672 {
673 self.flat_map_unordered(q!(|x| x))
674 }
675
676 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
677 ///
678 /// If the predicate returns `true`, the output optional contains the same value.
679 /// If the predicate returns `false`, the output optional is empty.
680 ///
681 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
682 /// not modify or take ownership of the value. If you need to modify the value while filtering
683 /// use [`Singleton::filter_map`] instead.
684 ///
685 /// # Example
686 /// ```rust
687 /// # #[cfg(feature = "deploy")] {
688 /// # use hydro_lang::prelude::*;
689 /// # use futures::StreamExt;
690 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
691 /// let tick = process.tick();
692 /// let singleton = tick.singleton(q!(5));
693 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
694 /// # }, |mut stream| async move {
695 /// // 5
696 /// # assert_eq!(stream.next().await.unwrap(), 5);
697 /// # }));
698 /// # }
699 /// ```
700 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
701 where
702 F: Fn(&T) -> bool + 'a,
703 {
704 let f = f.splice_fn1_borrow_ctx(&self.location).into();
705 Optional::new(
706 self.location.clone(),
707 HydroNode::Filter {
708 f,
709 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
710 metadata: self
711 .location
712 .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
713 },
714 )
715 }
716
717 /// An operator that both filters and maps. It yields the value only if the supplied
718 /// closure `f` returns `Some(value)`.
719 ///
720 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
721 /// If the closure returns `None`, the output optional is empty.
722 ///
723 /// # Example
724 /// ```rust
725 /// # #[cfg(feature = "deploy")] {
726 /// # use hydro_lang::prelude::*;
727 /// # use futures::StreamExt;
728 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
729 /// let tick = process.tick();
730 /// let singleton = tick.singleton(q!("42"));
731 /// singleton
732 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
733 /// .all_ticks()
734 /// # }, |mut stream| async move {
735 /// // 42
736 /// # assert_eq!(stream.next().await.unwrap(), 42);
737 /// # }));
738 /// # }
739 /// ```
740 pub fn filter_map<U, F>(
741 self,
742 f: impl IntoQuotedMut<'a, F, L>,
743 ) -> Optional<U, L, B::UnderlyingBound>
744 where
745 F: Fn(T) -> Option<U> + 'a,
746 {
747 let f = f.splice_fn1_ctx(&self.location).into();
748 Optional::new(
749 self.location.clone(),
750 HydroNode::FilterMap {
751 f,
752 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
753 metadata: self
754 .location
755 .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
756 },
757 )
758 }
759
760 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
761 ///
762 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
763 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
764 /// non-null. This is useful for combining several pieces of state together.
765 ///
766 /// # Example
767 /// ```rust
768 /// # #[cfg(feature = "deploy")] {
769 /// # use hydro_lang::prelude::*;
770 /// # use futures::StreamExt;
771 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
772 /// let tick = process.tick();
773 /// let numbers = process
774 /// .source_iter(q!(vec![123, 456]))
775 /// .batch(&tick, nondet!(/** test */));
776 /// let count = numbers.clone().count(); // Singleton
777 /// let max = numbers.max(); // Optional
778 /// count.zip(max).all_ticks()
779 /// # }, |mut stream| async move {
780 /// // [(2, 456)]
781 /// # for w in vec![(2, 456)] {
782 /// # assert_eq!(stream.next().await.unwrap(), w);
783 /// # }
784 /// # }));
785 /// # }
786 /// ```
787 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
788 where
789 Self: ZipResult<'a, O, Location = L>,
790 B: IsBounded,
791 {
792 check_matching_location(&self.location, &Self::other_location(&other));
793
794 if L::is_top_level()
795 && let Some(tick) = self.location.try_tick()
796 {
797 let self_location = self.location().clone();
798 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
799 let out = zip_inside_tick(
800 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
801 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
802 other_location.clone(),
803 HydroNode::Cast {
804 inner: Box::new(Self::other_ir_node(other)),
805 metadata: other_location.new_node_metadata(Optional::<
806 <Self as ZipResult<'a, O>>::OtherType,
807 Tick<L>,
808 Bounded,
809 >::collection_kind(
810 )),
811 },
812 )
813 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
814 )
815 .latest();
816
817 Self::make(self_location, out.ir_node.replace(HydroNode::Placeholder))
818 } else {
819 Self::make(
820 self.location.clone(),
821 HydroNode::CrossSingleton {
822 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
823 right: Box::new(Self::other_ir_node(other)),
824 metadata: self.location.new_node_metadata(CollectionKind::Optional {
825 bound: B::BOUND_KIND,
826 element_type: stageleft::quote_type::<
827 <Self as ZipResult<'a, O>>::ElementType,
828 >()
829 .into(),
830 }),
831 },
832 )
833 }
834 }
835
836 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
837 /// boolean signal is `true`, otherwise the output is null.
838 ///
839 /// # Example
840 /// ```rust
841 /// # #[cfg(feature = "deploy")] {
842 /// # use hydro_lang::prelude::*;
843 /// # use futures::StreamExt;
844 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
845 /// let tick = process.tick();
846 /// // ticks are lazy by default, forces the second tick to run
847 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
848 ///
849 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
850 /// let batch_first_tick = process
851 /// .source_iter(q!(vec![1]))
852 /// .batch(&tick, nondet!(/** test */));
853 /// let batch_second_tick = process
854 /// .source_iter(q!(vec![1, 2, 3]))
855 /// .batch(&tick, nondet!(/** test */))
856 /// .defer_tick();
857 /// batch_first_tick.chain(batch_second_tick).count()
858 /// .filter_if(signal)
859 /// .all_ticks()
860 /// # }, |mut stream| async move {
861 /// // [1]
862 /// # for w in vec![1] {
863 /// # assert_eq!(stream.next().await.unwrap(), w);
864 /// # }
865 /// # }));
866 /// # }
867 /// ```
868 pub fn filter_if(
869 self,
870 signal: Singleton<bool, L, B>,
871 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
872 where
873 B: IsBounded,
874 {
875 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
876 }
877
878 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
879 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
880 ///
881 /// Useful for conditionally processing, such as only emitting a singleton's value outside
882 /// a tick if some other condition is satisfied.
883 ///
884 /// # Example
885 /// ```rust
886 /// # #[cfg(feature = "deploy")] {
887 /// # use hydro_lang::prelude::*;
888 /// # use futures::StreamExt;
889 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
890 /// let tick = process.tick();
891 /// // ticks are lazy by default, forces the second tick to run
892 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
893 ///
894 /// let batch_first_tick = process
895 /// .source_iter(q!(vec![1]))
896 /// .batch(&tick, nondet!(/** test */));
897 /// let batch_second_tick = process
898 /// .source_iter(q!(vec![1, 2, 3]))
899 /// .batch(&tick, nondet!(/** test */))
900 /// .defer_tick(); // appears on the second tick
901 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
902 /// batch_first_tick.chain(batch_second_tick).count()
903 /// .filter_if_some(some_on_first_tick)
904 /// .all_ticks()
905 /// # }, |mut stream| async move {
906 /// // [1]
907 /// # for w in vec![1] {
908 /// # assert_eq!(stream.next().await.unwrap(), w);
909 /// # }
910 /// # }));
911 /// # }
912 /// ```
913 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
914 pub fn filter_if_some<U>(
915 self,
916 signal: Optional<U, L, B>,
917 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
918 where
919 B: IsBounded,
920 {
921 self.filter_if(signal.is_some())
922 }
923
924 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
925 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
926 ///
927 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
928 /// the condition.
929 ///
930 /// # Example
931 /// ```rust
932 /// # #[cfg(feature = "deploy")] {
933 /// # use hydro_lang::prelude::*;
934 /// # use futures::StreamExt;
935 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
936 /// let tick = process.tick();
937 /// // ticks are lazy by default, forces the second tick to run
938 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
939 ///
940 /// let batch_first_tick = process
941 /// .source_iter(q!(vec![1]))
942 /// .batch(&tick, nondet!(/** test */));
943 /// let batch_second_tick = process
944 /// .source_iter(q!(vec![1, 2, 3]))
945 /// .batch(&tick, nondet!(/** test */))
946 /// .defer_tick(); // appears on the second tick
947 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
948 /// batch_first_tick.chain(batch_second_tick).count()
949 /// .filter_if_none(some_on_first_tick)
950 /// .all_ticks()
951 /// # }, |mut stream| async move {
952 /// // [3]
953 /// # for w in vec![3] {
954 /// # assert_eq!(stream.next().await.unwrap(), w);
955 /// # }
956 /// # }));
957 /// # }
958 /// ```
959 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
960 pub fn filter_if_none<U>(
961 self,
962 other: Optional<U, L, B>,
963 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
964 where
965 B: IsBounded,
966 {
967 self.filter_if(other.is_none())
968 }
969
970 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
971 ///
972 /// # Example
973 /// ```rust
974 /// # #[cfg(feature = "deploy")] {
975 /// # use hydro_lang::prelude::*;
976 /// # use futures::StreamExt;
977 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
978 /// let tick = process.tick();
979 /// let a = tick.singleton(q!(5));
980 /// let b = tick.singleton(q!(5));
981 /// a.equals(b).all_ticks()
982 /// # }, |mut stream| async move {
983 /// // [true]
984 /// # assert_eq!(stream.next().await.unwrap(), true);
985 /// # }));
986 /// # }
987 /// ```
988 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
989 where
990 T: PartialEq,
991 B: IsBounded,
992 {
993 self.zip(other).map(q!(|(a, b)| a == b))
994 }
995
996 /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
997 /// greater than or equal to the provided threshold. The event will have the value of the
998 /// given threshold.
999 ///
1000 /// This requires the incoming singleton to be monotonic, because otherwise the detection of
1001 /// the threshold would be non-deterministic.
1002 ///
1003 /// # Example
1004 /// ```rust
1005 /// # #[cfg(feature = "deploy")] {
1006 /// # use hydro_lang::prelude::*;
1007 /// # use futures::StreamExt;
1008 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1009 /// let a = // singleton 1 ~> 5 ~> 10
1010 /// # process.singleton(q!(5));
1011 /// let b = process.singleton(q!(4));
1012 /// a.threshold_greater_or_equal(b)
1013 /// # }, |mut stream| async move {
1014 /// // [4]
1015 /// # assert_eq!(stream.next().await.unwrap(), 4);
1016 /// # }));
1017 /// # }
1018 /// ```
1019 pub fn threshold_greater_or_equal<B2: IsBounded>(
1020 self,
1021 threshold: Singleton<T, L, B2>,
1022 ) -> Stream<T, L, B::UnderlyingBound>
1023 where
1024 T: Clone + PartialOrd,
1025 B: IsMonotonic,
1026 {
1027 let threshold = threshold.make_bounded();
1028 let self_location = self.location().clone();
1029 match self.try_make_bounded() {
1030 Ok(bounded) => {
1031 let uncasted = threshold
1032 .zip(bounded)
1033 .into_stream()
1034 .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
1035
1036 Stream::new(
1037 uncasted.location.clone(),
1038 uncasted.ir_node.replace(HydroNode::Placeholder),
1039 )
1040 }
1041 Err(me) => {
1042 let uncasted = sliced! {
1043 let me = use(me, nondet!(/** thresholds are deterministic */));
1044 let mut remaining_threshold = use::state(|l| {
1045 let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
1046 as_option
1047 });
1048
1049 let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
1050 remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
1051 passed.map(q!(|(t, _)| t))
1052 };
1053
1054 Stream::new(
1055 self_location,
1056 uncasted.ir_node.replace(HydroNode::Placeholder),
1057 )
1058 }
1059 }
1060 }
1061
1062 /// An operator which allows you to "name" a `HydroNode`.
1063 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
1064 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
1065 {
1066 let mut node = self.ir_node.borrow_mut();
1067 let metadata = node.metadata_mut();
1068 metadata.tag = Some(name.to_owned());
1069 }
1070 self
1071 }
1072}
1073
1074impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
1075 type Output = Singleton<bool, L, B::UnderlyingBound>;
1076
1077 fn not(self) -> Self::Output {
1078 self.map(q!(|b| !b))
1079 }
1080}
1081
1082impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
1083where
1084 L: Location<'a>,
1085{
1086 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
1087 /// the inner `Option`.
1088 ///
1089 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
1090 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
1091 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
1092 ///
1093 /// # Example
1094 /// ```rust
1095 /// # #[cfg(feature = "deploy")] {
1096 /// # use hydro_lang::prelude::*;
1097 /// # use futures::StreamExt;
1098 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1099 /// let tick = process.tick();
1100 /// let singleton = tick.singleton(q!(Some(42)));
1101 /// singleton.into_optional().all_ticks()
1102 /// # }, |mut stream| async move {
1103 /// // 42
1104 /// # assert_eq!(stream.next().await.unwrap(), 42);
1105 /// # }));
1106 /// # }
1107 /// ```
1108 pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
1109 self.filter_map(q!(|v| v))
1110 }
1111}
1112
1113impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
1114where
1115 L: Location<'a>,
1116{
1117 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
1118 ///
1119 /// # Example
1120 /// ```rust
1121 /// # #[cfg(feature = "deploy")] {
1122 /// # use hydro_lang::prelude::*;
1123 /// # use futures::StreamExt;
1124 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1125 /// let tick = process.tick();
1126 /// // ticks are lazy by default, forces the second tick to run
1127 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1128 ///
1129 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1130 /// let b = tick.singleton(q!(true)); // true, true
1131 /// a.and(b).all_ticks()
1132 /// # }, |mut stream| async move {
1133 /// // [true, false]
1134 /// # for w in vec![true, false] {
1135 /// # assert_eq!(stream.next().await.unwrap(), w);
1136 /// # }
1137 /// # }));
1138 /// # }
1139 /// ```
1140 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1141 where
1142 B: IsBounded,
1143 {
1144 self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1145 }
1146
1147 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1148 ///
1149 /// # Example
1150 /// ```rust
1151 /// # #[cfg(feature = "deploy")] {
1152 /// # use hydro_lang::prelude::*;
1153 /// # use futures::StreamExt;
1154 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1155 /// let tick = process.tick();
1156 /// // ticks are lazy by default, forces the second tick to run
1157 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1158 ///
1159 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1160 /// let b = tick.singleton(q!(false)); // false, false
1161 /// a.or(b).all_ticks()
1162 /// # }, |mut stream| async move {
1163 /// // [true, false]
1164 /// # for w in vec![true, false] {
1165 /// # assert_eq!(stream.next().await.unwrap(), w);
1166 /// # }
1167 /// # }));
1168 /// # }
1169 /// ```
1170 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1171 where
1172 B: IsBounded,
1173 {
1174 self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1175 }
1176}
1177
1178impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1179where
1180 L: Location<'a>,
1181{
1182 /// Returns a singleton value corresponding to the latest snapshot of the singleton
1183 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1184 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1185 /// all snapshots of this singleton into the atomic-associated tick will observe the
1186 /// same value each tick.
1187 ///
1188 /// # Non-Determinism
1189 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1190 /// the output singleton has a non-deterministic value since the snapshot can be at an
1191 /// arbitrary point in time.
1192 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1193 self,
1194 tick: &Tick<L2>,
1195 _nondet: NonDet,
1196 ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1197 Singleton::new(
1198 tick.drop_consistency(),
1199 HydroNode::Batch {
1200 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1201 metadata: tick
1202 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1203 },
1204 )
1205 }
1206
1207 /// Returns this singleton back into a top-level, asynchronous execution context where updates
1208 /// to the value will be asynchronously propagated.
1209 pub fn end_atomic(self) -> Singleton<T, L, B> {
1210 Singleton::new(
1211 self.location.tick.l.clone(),
1212 HydroNode::EndAtomic {
1213 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1214 metadata: self
1215 .location
1216 .tick
1217 .l
1218 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1219 },
1220 )
1221 }
1222}
1223
1224impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1225where
1226 L: Location<'a>,
1227{
1228 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1229 /// will observe the same version of the value and will be executed synchronously before any
1230 /// outputs are yielded (in [`Optional::end_atomic`]).
1231 ///
1232 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1233 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1234 /// a different version).
1235 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1236 let id = self.location.flow_state().borrow_mut().next_clock_id();
1237 let out_location = Atomic {
1238 tick: Tick {
1239 id,
1240 l: self.location.clone(),
1241 },
1242 };
1243 Singleton::new(
1244 out_location.clone(),
1245 HydroNode::BeginAtomic {
1246 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1247 metadata: out_location
1248 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1249 },
1250 )
1251 }
1252
1253 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1254 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1255 /// relevant data that contributed to the snapshot at tick `t`.
1256 ///
1257 /// # Non-Determinism
1258 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1259 /// the output singleton has a non-deterministic value since the snapshot can be at an
1260 /// arbitrary point in time.
1261 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1262 self,
1263 tick: &Tick<L2>,
1264 _nondet: NonDet,
1265 ) -> Singleton<T, Tick<L::DropConsistency>, Bounded> {
1266 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1267 Singleton::new(
1268 tick.drop_consistency(),
1269 HydroNode::Batch {
1270 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1271 metadata: tick
1272 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1273 },
1274 )
1275 }
1276
1277 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1278 /// with order corresponding to increasing prefixes of data contributing to the singleton.
1279 ///
1280 /// # Non-Determinism
1281 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1282 /// to non-deterministic batching and arrival of inputs, the output stream is
1283 /// non-deterministic.
1284 pub fn sample_eager(
1285 self,
1286 nondet: NonDet,
1287 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce> {
1288 sliced! {
1289 let snapshot = use(self, nondet);
1290 snapshot.into_stream()
1291 }
1292 .weaken_retries()
1293 }
1294
1295 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1296 /// value taken at various points in time. Because the input singleton may be
1297 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1298 /// represent the value of the singleton given some prefix of the streams leading up to
1299 /// it.
1300 ///
1301 /// # Non-Determinism
1302 /// The output stream is non-deterministic in which elements are sampled, since this
1303 /// is controlled by a clock.
1304 #[cfg(feature = "tokio")]
1305 pub fn sample_every(
1306 self,
1307 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1308 nondet: NonDet,
1309 ) -> Stream<T, L::DropConsistency, Unbounded, TotalOrder, AtLeastOnce>
1310 where
1311 L: TopLevel<'a>,
1312 {
1313 let samples = self.location.source_interval(interval);
1314 sliced! {
1315 let snapshot = use(self, nondet);
1316 let sample_batch = use(samples, nondet);
1317
1318 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1319 }
1320 .weaken_retries()
1321 }
1322
1323 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1324 /// implies that `B == Bounded`.
1325 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1326 where
1327 B: IsBounded,
1328 {
1329 Singleton::new(
1330 self.location.clone(),
1331 self.ir_node.replace(HydroNode::Placeholder),
1332 )
1333 }
1334
1335 #[expect(clippy::result_large_err, reason = "internal use only")]
1336 fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1337 if B::UnderlyingBound::BOUNDED {
1338 Ok(Singleton::new(
1339 self.location.clone(),
1340 self.ir_node.replace(HydroNode::Placeholder),
1341 ))
1342 } else {
1343 Err(self)
1344 }
1345 }
1346
1347 /// Clones this bounded singleton into a tick, returning a singleton that has the
1348 /// same value as the outer singleton. Because the outer singleton is bounded, this
1349 /// is deterministic because there is only a single immutable version.
1350 pub fn clone_into_tick<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1351 self,
1352 tick: &Tick<L2>,
1353 ) -> Singleton<T, Tick<L2>, Bounded>
1354 where
1355 B: IsBounded,
1356 T: Clone,
1357 {
1358 // TODO(shadaj): avoid printing simulator logs for this snapshot
1359 let inner = self.snapshot(
1360 tick,
1361 nondet!(/** bounded top-level singleton so deterministic */),
1362 );
1363 Singleton::new(tick.clone(), inner.ir_node.replace(HydroNode::Placeholder))
1364 }
1365
1366 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1367 ///
1368 /// # Example
1369 /// ```rust
1370 /// # #[cfg(feature = "deploy")] {
1371 /// # use hydro_lang::prelude::*;
1372 /// # use futures::StreamExt;
1373 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1374 /// let tick = process.tick();
1375 /// let batch_input = process
1376 /// .source_iter(q!(vec![123, 456]))
1377 /// .batch(&tick, nondet!(/** test */));
1378 /// batch_input.clone().chain(
1379 /// batch_input.count().into_stream()
1380 /// ).all_ticks()
1381 /// # }, |mut stream| async move {
1382 /// // [123, 456, 2]
1383 /// # for w in vec![123, 456, 2] {
1384 /// # assert_eq!(stream.next().await.unwrap(), w);
1385 /// # }
1386 /// # }));
1387 /// # }
1388 /// ```
1389 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1390 where
1391 B: IsBounded,
1392 {
1393 Stream::new(
1394 self.location.clone(),
1395 HydroNode::Cast {
1396 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1397 metadata: self.location.new_node_metadata(Stream::<
1398 T,
1399 Tick<L>,
1400 Bounded,
1401 TotalOrder,
1402 ExactlyOnce,
1403 >::collection_kind()),
1404 },
1405 )
1406 }
1407
1408 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1409 /// producing a singleton of the resolved output.
1410 ///
1411 /// This is useful when the singleton contains an async computation that must
1412 /// be awaited before further processing. The future is polled to completion
1413 /// before the output value is emitted.
1414 ///
1415 /// # Example
1416 /// ```rust
1417 /// # #[cfg(feature = "deploy")] {
1418 /// # use hydro_lang::prelude::*;
1419 /// # use futures::StreamExt;
1420 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1421 /// let tick = process.tick();
1422 /// let singleton = tick.singleton(q!(5));
1423 /// singleton
1424 /// .map(q!(|v| async move { v * 2 }))
1425 /// .resolve_future_blocking()
1426 /// .all_ticks()
1427 /// # }, |mut stream| async move {
1428 /// // 10
1429 /// # assert_eq!(stream.next().await.unwrap(), 10);
1430 /// # }));
1431 /// # }
1432 /// ```
1433 pub fn resolve_future_blocking(
1434 self,
1435 ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1436 where
1437 T: Future,
1438 B: IsBounded,
1439 {
1440 Singleton::new(
1441 self.location.clone(),
1442 HydroNode::ResolveFuturesBlocking {
1443 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1444 metadata: self
1445 .location
1446 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1447 },
1448 )
1449 }
1450}
1451
1452impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1453where
1454 L: Location<'a>,
1455{
1456 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1457 /// which will stream the value computed in _each_ tick as a separate stream element.
1458 ///
1459 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1460 /// producing one element in the output for each tick. This is useful for batched computations,
1461 /// where the results from each tick must be combined together.
1462 ///
1463 /// # Example
1464 /// ```rust
1465 /// # #[cfg(feature = "deploy")] {
1466 /// # use hydro_lang::prelude::*;
1467 /// # use futures::StreamExt;
1468 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1469 /// let tick = process.tick();
1470 /// # // ticks are lazy by default, forces the second tick to run
1471 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1472 /// # let batch_first_tick = process
1473 /// # .source_iter(q!(vec![1]))
1474 /// # .batch(&tick, nondet!(/** test */));
1475 /// # let batch_second_tick = process
1476 /// # .source_iter(q!(vec![1, 2, 3]))
1477 /// # .batch(&tick, nondet!(/** test */))
1478 /// # .defer_tick(); // appears on the second tick
1479 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1480 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1481 /// .count()
1482 /// .all_ticks()
1483 /// # }, |mut stream| async move {
1484 /// // [1, 3]
1485 /// # for w in vec![1, 3] {
1486 /// # assert_eq!(stream.next().await.unwrap(), w);
1487 /// # }
1488 /// # }));
1489 /// # }
1490 /// ```
1491 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1492 self.into_stream().all_ticks()
1493 }
1494
1495 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1496 /// which will stream the value computed in _each_ tick as a separate stream element.
1497 ///
1498 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1499 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1500 /// singleton's [`Tick`] context.
1501 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1502 self.into_stream().all_ticks_atomic()
1503 }
1504
1505 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1506 /// be asynchronously updated with the latest value of the singleton inside the tick.
1507 ///
1508 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1509 /// tick that tracks the inner value. This is useful for getting the value as of the
1510 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1511 ///
1512 /// # Example
1513 /// ```rust
1514 /// # #[cfg(feature = "deploy")] {
1515 /// # use hydro_lang::prelude::*;
1516 /// # use futures::StreamExt;
1517 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1518 /// let tick = process.tick();
1519 /// # // ticks are lazy by default, forces the second tick to run
1520 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1521 /// # let batch_first_tick = process
1522 /// # .source_iter(q!(vec![1]))
1523 /// # .batch(&tick, nondet!(/** test */));
1524 /// # let batch_second_tick = process
1525 /// # .source_iter(q!(vec![1, 2, 3]))
1526 /// # .batch(&tick, nondet!(/** test */))
1527 /// # .defer_tick(); // appears on the second tick
1528 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1529 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1530 /// .count()
1531 /// .latest()
1532 /// # .sample_eager(nondet!(/** test */))
1533 /// # }, |mut stream| async move {
1534 /// // asynchronously changes from 1 ~> 3
1535 /// # for w in vec![1, 3] {
1536 /// # assert_eq!(stream.next().await.unwrap(), w);
1537 /// # }
1538 /// # }));
1539 /// # }
1540 /// ```
1541 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1542 Singleton::new(
1543 self.location.outer().clone(),
1544 HydroNode::YieldConcat {
1545 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1546 metadata: self
1547 .location
1548 .outer()
1549 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1550 },
1551 )
1552 }
1553
1554 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1555 /// be updated with the latest value of the singleton inside the tick.
1556 ///
1557 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1558 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1559 /// singleton's [`Tick`] context.
1560 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1561 let out_location = Atomic {
1562 tick: self.location.clone(),
1563 };
1564 Singleton::new(
1565 out_location.clone(),
1566 HydroNode::YieldConcat {
1567 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1568 metadata: out_location
1569 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1570 },
1571 )
1572 }
1573}
1574
1575#[doc(hidden)]
1576/// Helper trait that determines the output collection type for [`Singleton::zip`].
1577///
1578/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1579/// [`Singleton`].
1580#[sealed::sealed]
1581pub trait ZipResult<'a, Other> {
1582 /// The output collection type.
1583 type Out;
1584 /// The type of the tupled output value.
1585 type ElementType;
1586 /// The type of the other collection's value.
1587 type OtherType;
1588 /// The location where the tupled result will be materialized.
1589 type Location: Location<'a>;
1590
1591 /// The location of the second input to the `zip`.
1592 fn other_location(other: &Other) -> Self::Location;
1593 /// The IR node of the second input to the `zip`.
1594 fn other_ir_node(other: Other) -> HydroNode;
1595
1596 /// Constructs the output live collection given an IR node containing the zip result.
1597 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1598}
1599
1600#[sealed::sealed]
1601impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1602where
1603 L: Location<'a>,
1604{
1605 type Out = Singleton<(T, U), L, B>;
1606 type ElementType = (T, U);
1607 type OtherType = U;
1608 type Location = L;
1609
1610 fn other_location(other: &Singleton<U, L, B>) -> L {
1611 other.location.clone()
1612 }
1613
1614 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1615 other.ir_node.replace(HydroNode::Placeholder)
1616 }
1617
1618 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1619 Singleton::new(
1620 location.clone(),
1621 HydroNode::Cast {
1622 inner: Box::new(ir_node),
1623 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1624 },
1625 )
1626 }
1627}
1628
1629#[sealed::sealed]
1630impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1631 for Singleton<T, L, B>
1632where
1633 L: Location<'a>,
1634{
1635 type Out = Optional<(T, U), L, B::UnderlyingBound>;
1636 type ElementType = (T, U);
1637 type OtherType = U;
1638 type Location = L;
1639
1640 fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1641 other.location.clone()
1642 }
1643
1644 fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1645 other.ir_node.replace(HydroNode::Placeholder)
1646 }
1647
1648 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1649 Optional::new(location, ir_node)
1650 }
1651}
1652
1653#[cfg(test)]
1654mod tests {
1655 #[cfg(feature = "deploy")]
1656 use futures::{SinkExt, StreamExt};
1657 #[cfg(feature = "deploy")]
1658 use hydro_deploy::Deployment;
1659 #[cfg(any(feature = "deploy", feature = "sim"))]
1660 use stageleft::q;
1661
1662 #[cfg(any(feature = "deploy", feature = "sim"))]
1663 use crate::compile::builder::FlowBuilder;
1664 #[cfg(feature = "deploy")]
1665 use crate::live_collections::stream::ExactlyOnce;
1666 #[cfg(any(feature = "deploy", feature = "sim"))]
1667 use crate::location::Location;
1668 #[cfg(any(feature = "deploy", feature = "sim"))]
1669 use crate::nondet::nondet;
1670
1671 #[cfg(feature = "deploy")]
1672 #[tokio::test]
1673 async fn tick_cycle_cardinality() {
1674 let mut deployment = Deployment::new();
1675
1676 let mut flow = FlowBuilder::new();
1677 let node = flow.process::<()>();
1678 let external = flow.external::<()>();
1679
1680 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1681
1682 let node_tick = node.tick();
1683 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1684 let counts = singleton
1685 .clone()
1686 .into_stream()
1687 .count()
1688 .filter_if(
1689 input
1690 .batch(&node_tick, nondet!(/** testing */))
1691 .first()
1692 .is_some(),
1693 )
1694 .all_ticks()
1695 .send_bincode_external(&external);
1696 complete_cycle.complete_next_tick(singleton);
1697
1698 let nodes = flow
1699 .with_process(&node, deployment.Localhost())
1700 .with_external(&external, deployment.Localhost())
1701 .deploy(&mut deployment);
1702
1703 deployment.deploy().await.unwrap();
1704
1705 let mut tick_trigger = nodes.connect(input_send).await;
1706 let mut external_out = nodes.connect(counts).await;
1707
1708 deployment.start().await.unwrap();
1709
1710 tick_trigger.send(()).await.unwrap();
1711
1712 assert_eq!(external_out.next().await.unwrap(), 1);
1713
1714 tick_trigger.send(()).await.unwrap();
1715
1716 assert_eq!(external_out.next().await.unwrap(), 1);
1717 }
1718
1719 #[cfg(feature = "sim")]
1720 #[test]
1721 #[should_panic]
1722 fn sim_fold_intermediate_states() {
1723 let mut flow = FlowBuilder::new();
1724 let node = flow.process::<()>();
1725
1726 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1727 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1728
1729 let tick = node.tick();
1730 let batch = folded.snapshot(&tick, nondet!(/** test */));
1731 let out_recv = batch.all_ticks().sim_output();
1732
1733 flow.sim().exhaustive(async || {
1734 assert_eq!(out_recv.next().await.unwrap(), 10);
1735 });
1736 }
1737
1738 #[cfg(feature = "sim")]
1739 #[test]
1740 fn sim_fold_intermediate_state_count() {
1741 let mut flow = FlowBuilder::new();
1742 let node = flow.process::<()>();
1743
1744 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1745 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1746
1747 let tick = node.tick();
1748 let batch = folded.snapshot(&tick, nondet!(/** test */));
1749 let out_recv = batch.all_ticks().sim_output();
1750
1751 let instance_count = flow.sim().exhaustive(async || {
1752 let out = out_recv.collect::<Vec<_>>().await;
1753 assert_eq!(out.last(), Some(&10));
1754 });
1755
1756 assert_eq!(
1757 instance_count,
1758 16 // 2^4 possible subsets of intermediates (including initial state)
1759 )
1760 }
1761
1762 #[cfg(feature = "sim")]
1763 #[test]
1764 fn sim_fold_no_repeat_initial() {
1765 // check that we don't repeat the initial state of the fold in autonomous decisions
1766
1767 let mut flow = FlowBuilder::new();
1768 let node = flow.process::<()>();
1769
1770 let (in_port, input) = node.sim_input();
1771 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1772
1773 let tick = node.tick();
1774 let batch = folded.snapshot(&tick, nondet!(/** test */));
1775 let out_recv = batch.all_ticks().sim_output();
1776
1777 flow.sim().exhaustive(async || {
1778 assert_eq!(out_recv.next().await.unwrap(), 0);
1779
1780 in_port.send(123);
1781
1782 assert_eq!(out_recv.next().await.unwrap(), 123);
1783 });
1784 }
1785
1786 #[cfg(feature = "sim")]
1787 #[test]
1788 #[should_panic]
1789 fn sim_fold_repeats_snapshots() {
1790 // when the tick is driven by a snapshot AND something else, the snapshot can
1791 // "stutter" and repeat the same state multiple times
1792
1793 let mut flow = FlowBuilder::new();
1794 let node = flow.process::<()>();
1795
1796 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1797 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1798
1799 let tick = node.tick();
1800 let batch = source
1801 .batch(&tick, nondet!(/** test */))
1802 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1803 let out_recv = batch.all_ticks().sim_output();
1804
1805 flow.sim().exhaustive(async || {
1806 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1807 {
1808 panic!("repeated snapshot");
1809 }
1810 });
1811 }
1812
1813 #[cfg(feature = "sim")]
1814 #[test]
1815 fn sim_fold_repeats_snapshots_count() {
1816 // check the number of instances
1817 let mut flow = FlowBuilder::new();
1818 let node = flow.process::<()>();
1819
1820 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1821 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1822
1823 let tick = node.tick();
1824 let batch = source
1825 .batch(&tick, nondet!(/** test */))
1826 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1827 let out_recv = batch.all_ticks().sim_output();
1828
1829 let count = flow.sim().exhaustive(async || {
1830 let _ = out_recv.collect::<Vec<_>>().await;
1831 });
1832
1833 assert_eq!(count, 52);
1834 // don't have a combinatorial explanation for this number yet, but checked via logs
1835 }
1836
1837 #[cfg(feature = "sim")]
1838 #[test]
1839 fn sim_top_level_singleton_exhaustive() {
1840 // ensures that top-level singletons have only one snapshot
1841 let mut flow = FlowBuilder::new();
1842 let node = flow.process::<()>();
1843
1844 let singleton = node.singleton(q!(1));
1845 let tick = node.tick();
1846 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1847 let out_recv = batch.all_ticks().sim_output();
1848
1849 let count = flow.sim().exhaustive(async || {
1850 let _ = out_recv.collect::<Vec<_>>().await;
1851 });
1852
1853 assert_eq!(count, 1);
1854 }
1855
1856 #[cfg(feature = "sim")]
1857 #[test]
1858 fn sim_top_level_singleton_join_count() {
1859 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1860 // exploration
1861
1862 let mut flow = FlowBuilder::new();
1863 let node = flow.process::<()>();
1864
1865 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1866 let tick = node.tick();
1867 let batch = source_iter
1868 .batch(&tick, nondet!(/** test */))
1869 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1870 let out_recv = batch.all_ticks().sim_output();
1871
1872 let instance_count = flow.sim().exhaustive(async || {
1873 let _ = out_recv.collect::<Vec<_>>().await;
1874 });
1875
1876 assert_eq!(
1877 instance_count,
1878 16 // 2^4 ways to split up (including a possibly empty first batch)
1879 )
1880 }
1881
1882 #[cfg(feature = "sim")]
1883 #[test]
1884 fn top_level_singleton_into_stream_no_replay() {
1885 let mut flow = FlowBuilder::new();
1886 let node = flow.process::<()>();
1887
1888 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1889 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1890
1891 let out_recv = folded.into_stream().sim_output();
1892
1893 flow.sim().exhaustive(async || {
1894 out_recv.assert_yields_only([10]).await;
1895 });
1896 }
1897
1898 #[cfg(feature = "sim")]
1899 #[test]
1900 fn inside_tick_singleton_zip() {
1901 use crate::live_collections::Stream;
1902 use crate::live_collections::sliced::sliced;
1903
1904 let mut flow = FlowBuilder::new();
1905 let node = flow.process::<()>();
1906
1907 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1908 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1909
1910 let out_recv = sliced! {
1911 let v = use(folded, nondet!(/** test */));
1912 v.clone().zip(v).into_stream()
1913 }
1914 .sim_output();
1915
1916 let count = flow.sim().exhaustive(async || {
1917 let out = out_recv.collect::<Vec<_>>().await;
1918 assert_eq!(out.last(), Some(&(3, 3)));
1919 });
1920
1921 assert_eq!(count, 4);
1922 }
1923
1924 /// Reproducer for simulator hang when using cross_singleton on a top-level
1925 /// unbounded stream (not inside sliced!). The exhaustive simulator hangs
1926 /// after the first iteration.
1927 #[cfg(feature = "sim")]
1928 #[test]
1929 fn sim_cross_singleton_top_level_unbounded_hang() {
1930 let mut flow = FlowBuilder::new();
1931 let node = flow.process::<()>();
1932
1933 let (cmd_port, input) = node.sim_input::<String, _, _>();
1934
1935 let top_level_singleton = node.singleton(q!(123));
1936
1937 // cross_singleton on a top-level stream - bug trigger
1938 let crossed = input.cross_singleton(top_level_singleton);
1939
1940 // Output directly
1941 let resp_port = crossed.sim_output();
1942
1943 let count = flow.sim().exhaustive(async || {
1944 cmd_port.send("abc".to_owned());
1945
1946 let responses: Vec<_> = resp_port.collect().await;
1947 assert!(!responses.is_empty());
1948 });
1949
1950 assert_eq!(count, 1);
1951 }
1952
1953 #[cfg(feature = "sim")]
1954 #[test]
1955 fn sim_top_level_singleton_state_count() {
1956 let mut flow = FlowBuilder::new();
1957 let process = flow.process::<()>();
1958
1959 let (cmd_port, input) = process.sim_input();
1960 {
1961 // increases exhaustive inputs from 1 to 2 before we optimized `From`
1962 use super::Singleton;
1963 use crate::live_collections::boundedness::Unbounded;
1964 let _singleton: Singleton<_, _, Unbounded> = process.singleton(q!(false)).into();
1965 }
1966 let tick = process.tick();
1967 let batched_unbatched = input.batch(&tick, nondet!(/** */)).all_ticks();
1968 let resp_port = batched_unbatched.sim_output();
1969
1970 let count = flow.sim().exhaustive(async || {
1971 cmd_port.send(());
1972 let _responses: Vec<_> = resp_port.collect().await;
1973 });
1974
1975 assert_eq!(count, 1);
1976 }
1977
1978 /// Regression test for #2939: singleton mut access-group counter resets per root.
1979 /// Two sequential `by_mut` captures on the same singleton, consumed by separate
1980 /// `for_each` roots, should get distinct access groups and build successfully.
1981 #[cfg(feature = "sim")]
1982 #[test]
1983 #[expect(unused_mut, reason = "sliced! macro generates mut bindings for state")]
1984 fn sim_mut_access_group_across_roots() {
1985 use crate::live_collections::sliced::sliced;
1986
1987 let mut flow = FlowBuilder::new();
1988 let node = flow.process::<()>();
1989
1990 let source = node.source_iter(q!(vec![1i32, 2, 3]));
1991
1992 let (first, second) = sliced! {
1993 let batch = use(source, nondet!(/** test */));
1994 let mut total = use::state(|l| l.singleton(q!(0i32)));
1995 let total_mut = total.by_mut();
1996
1997 let first = batch.clone().map(q!(|x| {
1998 *total_mut += x;
1999 *total_mut
2000 }));
2001 let second = batch.map(q!(|x| {
2002 *total_mut += x;
2003 *total_mut
2004 }));
2005 (first, second)
2006 };
2007
2008 let first_recv = first.sim_output();
2009 let second_recv = second.sim_output();
2010
2011 flow.sim().exhaustive(async || {
2012 // Both outputs should produce values without panicking.
2013 // The exact values depend on ordering, but the graph must build.
2014 let _first: Vec<i32> = first_recv.collect().await;
2015 let _second: Vec<i32> = second_recv.collect().await;
2016 });
2017 }
2018
2019 /// Regression test for #2940: access groups must follow code (staging) order,
2020 /// not IR traversal order. When `second.chain(first)` reverses the consumption
2021 /// order, the mutations must still execute in the order they were staged.
2022 #[cfg(feature = "sim")]
2023 #[test]
2024 #[expect(unused_mut, reason = "sliced! macro generates mut bindings for state")]
2025 fn sim_mut_access_groups_follow_code_order() {
2026 use crate::live_collections::sliced::sliced;
2027
2028 let mut flow = FlowBuilder::new();
2029 let node = flow.process::<()>();
2030
2031 let source = node.source_iter(q!(vec![3i32]));
2032
2033 let out_recv = sliced! {
2034 let batch = use(source, nondet!(/** test */));
2035 let mut total = use::state(|l| l.singleton(q!(0i32)));
2036 let total_mut = total.by_mut();
2037
2038 // Defined FIRST in code: addition
2039 let first = batch.clone().map(q!(|x| {
2040 *total_mut += x;
2041 *total_mut
2042 }));
2043 // Defined SECOND in code: doubling
2044 let second = batch.map(q!(|_x| {
2045 *total_mut *= 2;
2046 *total_mut
2047 }));
2048 // Chain in OPPOSITE order of definition — must not affect mutation order.
2049 second.chain(first)
2050 }
2051 .sim_output();
2052
2053 flow.sim().exhaustive(async || {
2054 let results: Vec<i32> = out_recv.collect().await;
2055 // Code-order semantics: first runs (total = 0 + 3 = 3), then second
2056 // runs (total = 3 * 2 = 6). Output is second.chain(first) => [6, 3].
2057 assert_eq!(results, vec![6, 3]);
2058 });
2059 }
2060}