hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use sealed::sealed;
11use stageleft::{IntoQuotedMut, QuotedWithContext, q};
12
13use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
14use super::keyed_stream::KeyedStream;
15use super::optional::Optional;
16use super::singleton::Singleton;
17use super::sliced::sliced;
18use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
19use crate::compile::builder::{CycleId, FlowState};
20use crate::compile::ir::{
21 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, SharedNode,
22};
23#[cfg(stageleft_runtime)]
24use crate::forward_handle::{CycleCollection, ReceiverComplete};
25use crate::forward_handle::{ForwardRef, TickCycle};
26use crate::live_collections::stream::{Ordering, Retries};
27#[cfg(stageleft_runtime)]
28use crate::location::dynamic::{DynLocation, LocationId};
29use crate::location::tick::DeferTick;
30use crate::location::{Atomic, Location, Tick, check_matching_location};
31use crate::manual_expr::ManualExpr;
32use crate::nondet::{NonDet, nondet};
33use crate::properties::manual_proof;
34
35/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
36///
37/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
38/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
39/// indicates that entries may be added over time, but once an entry is added it will never be
40/// removed and its value will never change.
41pub trait KeyedSingletonBound {
42 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
43 type UnderlyingBound: Boundedness;
44 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
45 type ValueBound: Boundedness;
46
47 /// The type of the keyed singleton if the value for each key is immutable.
48 type WithBoundedValue: KeyedSingletonBound<
49 UnderlyingBound = Self::UnderlyingBound,
50 ValueBound = Bounded,
51 EraseMonotonic = Self::WithBoundedValue,
52 >;
53
54 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`KeyedStream`] with [`Self`] boundedness.
55 type KeyedStreamToMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
56
57 /// The [`Boundedness`] of the keyed singleton produced by folding a [`KeyedStream`] with
58 /// [`Self`] boundedness when the aggregation does *not* have a monotonicity proof.
59 ///
60 /// Without a monotonicity proof, the per-key values may change arbitrarily, so an unbounded
61 /// input collapses to [`MonotonicKeys`] (keys are still only added, never removed).
62 type KeyedStreamToNonMonotone: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
63
64 /// The type of the keyed singleton if the value for each key is no longer monotonic.
65 type EraseMonotonic: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Self::ValueBound>;
66
67 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
68 fn bound_kind() -> KeyedSingletonBoundKind;
69}
70
71impl KeyedSingletonBound for Unbounded {
72 type UnderlyingBound = Unbounded;
73 type ValueBound = Unbounded;
74 type WithBoundedValue = BoundedValue;
75 type KeyedStreamToMonotone = MonotonicValue;
76 type KeyedStreamToNonMonotone = MonotonicKeys;
77 type EraseMonotonic = Unbounded;
78
79 fn bound_kind() -> KeyedSingletonBoundKind {
80 KeyedSingletonBoundKind::Unbounded
81 }
82}
83
84impl KeyedSingletonBound for Bounded {
85 type UnderlyingBound = Bounded;
86 type ValueBound = Bounded;
87 type WithBoundedValue = Bounded;
88 type KeyedStreamToMonotone = Bounded;
89 type KeyedStreamToNonMonotone = Bounded;
90 type EraseMonotonic = Bounded;
91
92 fn bound_kind() -> KeyedSingletonBoundKind {
93 KeyedSingletonBoundKind::Bounded
94 }
95}
96
97/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
98/// its value is bounded and will never change, but new entries may appear asynchronously
99pub struct BoundedValue;
100
101impl KeyedSingletonBound for BoundedValue {
102 type UnderlyingBound = Unbounded;
103 type ValueBound = Bounded;
104 type WithBoundedValue = BoundedValue;
105 type KeyedStreamToMonotone = BoundedValue;
106 type KeyedStreamToNonMonotone = BoundedValue;
107 type EraseMonotonic = BoundedValue;
108
109 fn bound_kind() -> KeyedSingletonBoundKind {
110 KeyedSingletonBoundKind::BoundedValue
111 }
112}
113
114/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
115/// it will never be removed, and the corresponding value will only increase monotonically.
116pub struct MonotonicValue;
117
118impl KeyedSingletonBound for MonotonicValue {
119 type UnderlyingBound = Unbounded;
120 type ValueBound = Unbounded;
121 type WithBoundedValue = BoundedValue;
122 type KeyedStreamToMonotone = MonotonicValue;
123 type KeyedStreamToNonMonotone = MonotonicKeys;
124 type EraseMonotonic = MonotonicKeys;
125
126 fn bound_kind() -> KeyedSingletonBoundKind {
127 KeyedSingletonBoundKind::MonotonicValue
128 }
129}
130
131/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key
132/// appears, it will never be removed, but the corresponding value may change arbitrarily.
133pub struct MonotonicKeys;
134
135impl KeyedSingletonBound for MonotonicKeys {
136 type UnderlyingBound = Unbounded;
137 type ValueBound = Unbounded;
138 type WithBoundedValue = BoundedValue;
139 type KeyedStreamToMonotone = MonotonicKeys;
140 type KeyedStreamToNonMonotone = MonotonicKeys;
141 type EraseMonotonic = MonotonicKeys;
142
143 fn bound_kind() -> KeyedSingletonBoundKind {
144 KeyedSingletonBoundKind::MonotonicKeys
145 }
146}
147
148#[sealed]
149#[diagnostic::on_unimplemented(
150 message = "The keyed singleton must have monotonic values (`MonotonicValue`) or be bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
151 label = "required here",
152 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
153)]
154/// Marker trait that is implemented for [`KeyedSingletonBound`] types whose per-key values
155/// are monotonically non-decreasing (or bounded).
156pub trait IsKeyedMonotonic: KeyedSingletonBound {}
157
158#[sealed]
159#[diagnostic::do_not_recommend]
160impl IsKeyedMonotonic for MonotonicValue {}
161
162#[sealed]
163#[diagnostic::do_not_recommend]
164impl IsKeyedMonotonic for BoundedValue {}
165
166#[sealed]
167#[diagnostic::do_not_recommend]
168impl<B: IsBounded + KeyedSingletonBound> IsKeyedMonotonic for B {}
169
170/// Mapping from keys of type `K` to values of type `V`.
171///
172/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
173/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
174/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
175/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
176/// keys cannot be removed and the value for each key is immutable.
177///
178/// Type Parameters:
179/// - `K`: the type of the key for each entry
180/// - `V`: the type of the value for each entry
181/// - `Loc`: the [`Location`] where the keyed singleton is materialized
182/// - `Bound`: tracks whether the entries are:
183/// - [`Bounded`] (local and finite)
184/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
185/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
186pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
187 pub(crate) location: Loc,
188 pub(crate) ir_node: RefCell<HydroNode>,
189 pub(crate) flow_state: FlowState,
190
191 _phantom: PhantomData<(K, V, Loc, Bound)>,
192}
193
194impl<K, V, L, B: KeyedSingletonBound> Drop for KeyedSingleton<K, V, L, B> {
195 fn drop(&mut self) {
196 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
197 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
198 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
199 input: Box::new(ir_node),
200 op_metadata: HydroIrOpMetadata::new(),
201 });
202 }
203 }
204}
205
206impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
207 for KeyedSingleton<K, V, Loc, Bound>
208{
209 fn clone(&self) -> Self {
210 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
211 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
212 *self.ir_node.borrow_mut() = HydroNode::Tee {
213 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
214 metadata: self.location.new_node_metadata(Self::collection_kind()),
215 };
216 }
217
218 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
219 KeyedSingleton {
220 location: self.location.clone(),
221 flow_state: self.flow_state.clone(),
222 ir_node: HydroNode::Tee {
223 inner: SharedNode(inner.0.clone()),
224 metadata: metadata.clone(),
225 }
226 .into(),
227 _phantom: PhantomData,
228 }
229 } else {
230 unreachable!()
231 }
232 }
233}
234
235impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
236 for KeyedSingleton<K, V, L, B>
237where
238 L: Location<'a>,
239{
240 type Location = L;
241
242 fn create_source(cycle_id: CycleId, location: L) -> Self {
243 KeyedSingleton {
244 flow_state: location.flow_state().clone(),
245 location: location.clone(),
246 ir_node: RefCell::new(HydroNode::CycleSource {
247 cycle_id,
248 metadata: location.new_node_metadata(Self::collection_kind()),
249 }),
250 _phantom: PhantomData,
251 }
252 }
253}
254
255impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
256where
257 L: Location<'a>,
258{
259 type Location = Tick<L>;
260
261 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
262 KeyedSingleton::new(
263 location.clone(),
264 HydroNode::CycleSource {
265 cycle_id,
266 metadata: location.new_node_metadata(Self::collection_kind()),
267 },
268 )
269 }
270}
271
272impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
273where
274 L: Location<'a>,
275{
276 fn defer_tick(self) -> Self {
277 KeyedSingleton::defer_tick(self)
278 }
279}
280
281impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
282 for KeyedSingleton<K, V, L, B>
283where
284 L: Location<'a>,
285{
286 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
287 assert_eq!(
288 Location::id(&self.location),
289 expected_location,
290 "locations do not match"
291 );
292 self.location
293 .flow_state()
294 .borrow_mut()
295 .push_root(HydroRoot::CycleSink {
296 cycle_id,
297 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
298 op_metadata: HydroIrOpMetadata::new(),
299 });
300 }
301}
302
303impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
304where
305 L: Location<'a>,
306{
307 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
308 assert_eq!(
309 Location::id(&self.location),
310 expected_location,
311 "locations do not match"
312 );
313 self.location
314 .flow_state()
315 .borrow_mut()
316 .push_root(HydroRoot::CycleSink {
317 cycle_id,
318 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
319 op_metadata: HydroIrOpMetadata::new(),
320 });
321 }
322}
323
324impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
325 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
326 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
327 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
328
329 let flow_state = location.flow_state().clone();
330 KeyedSingleton {
331 location,
332 flow_state,
333 ir_node: RefCell::new(ir_node),
334 _phantom: PhantomData,
335 }
336 }
337
338 /// Returns the [`Location`] where this keyed singleton is being materialized.
339 pub fn location(&self) -> &L {
340 &self.location
341 }
342
343 /// Weakens the consistency of this live collection to not guarantee any consistency across
344 /// cluster members (if this collection is on a cluster).
345 pub fn weaken_consistency(self) -> KeyedSingleton<K, V, L::DropConsistency, B>
346 where
347 L: Location<'a>,
348 {
349 if L::consistency()
350 .is_none_or(|c| c == crate::location::dynamic::ClusterConsistency::NoConsistency)
351 {
352 // already no consistency
353 KeyedSingleton::new(
354 self.location.drop_consistency(),
355 self.ir_node.replace(HydroNode::Placeholder),
356 )
357 } else {
358 KeyedSingleton::new(
359 self.location.drop_consistency(),
360 HydroNode::Cast {
361 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
362 metadata: self
363 .location
364 .drop_consistency()
365 .new_node_metadata(
366 KeyedSingleton::<K, V, L::DropConsistency, B>::collection_kind(),
367 ),
368 },
369 )
370 }
371 }
372
373 /// Casts this live collection to have the consistency guarantees specified in the given
374 /// location type parameter. The developer must ensure that the strengthened consistency
375 /// is actually guaranteed, via the proof field (see [`crate::prelude::manual_proof`]).
376 pub fn assert_has_consistency_of<L2: Location<'a, DropConsistency = L::DropConsistency>>(
377 self,
378 _proof: impl crate::properties::ConsistencyProof,
379 ) -> KeyedSingleton<K, V, L2, B>
380 where
381 L: Location<'a>,
382 {
383 if L::consistency() == L2::consistency() {
384 // already consistent
385 KeyedSingleton::new(
386 self.location.with_consistency_of(),
387 self.ir_node.replace(HydroNode::Placeholder),
388 )
389 } else {
390 KeyedSingleton::new(
391 self.location.with_consistency_of(),
392 HydroNode::AssertIsConsistent {
393 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
394 trusted: false,
395 metadata: self
396 .location
397 .clone()
398 .with_consistency_of::<L2>()
399 .new_node_metadata(KeyedSingleton::<K, V, L2, B>::collection_kind()),
400 },
401 )
402 }
403 }
404}
405
406#[cfg(stageleft_runtime)]
407fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
408 me: KeyedSingleton<K, V, L, Bounded>,
409) -> Singleton<usize, L, Bounded> {
410 me.entries().count()
411}
412
413#[cfg(stageleft_runtime)]
414fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
415 me: KeyedSingleton<K, V, L, Bounded>,
416) -> Singleton<HashMap<K, V>, L, Bounded>
417where
418 K: Eq + Hash,
419{
420 me.entries()
421 .assume_ordering_trusted(nondet!(
422 /// There is only one element associated with each key. The closure technically
423 /// isn't commutative in the case where both passed entries have the same key
424 /// but different values.
425 ///
426 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
427 /// the key is never already present in the map.
428 ))
429 .fold(
430 q!(|| HashMap::new()),
431 q!(|map, (k, v)| {
432 map.insert(k, v);
433 }),
434 )
435}
436
437impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
438 pub(crate) fn collection_kind() -> CollectionKind {
439 CollectionKind::KeyedSingleton {
440 bound: B::bound_kind(),
441 key_type: stageleft::quote_type::<K>().into(),
442 value_type: stageleft::quote_type::<V>().into(),
443 }
444 }
445
446 /// Transforms each value by invoking `f` on each element, with keys staying the same
447 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
448 ///
449 /// If you do not want to modify the stream and instead only want to view
450 /// each item use [`KeyedSingleton::inspect`] instead.
451 ///
452 /// # Example
453 /// ```rust
454 /// # #[cfg(feature = "deploy")] {
455 /// # use hydro_lang::prelude::*;
456 /// # use futures::StreamExt;
457 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
458 /// let keyed_singleton = // { 1: 2, 2: 4 }
459 /// # process
460 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
461 /// # .into_keyed()
462 /// # .first();
463 /// keyed_singleton.map(q!(|v| v + 1))
464 /// # .entries()
465 /// # }, |mut stream| async move {
466 /// // { 1: 3, 2: 5 }
467 /// # let mut results = Vec::new();
468 /// # for _ in 0..2 {
469 /// # results.push(stream.next().await.unwrap());
470 /// # }
471 /// # results.sort();
472 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
473 /// # }));
474 /// # }
475 /// ```
476 pub fn map<U, F>(
477 self,
478 f: impl IntoQuotedMut<'a, F, L> + Copy,
479 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
480 where
481 F: Fn(V) -> U + 'a,
482 {
483 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
484 let map_f = q!({
485 let orig = f;
486 move |(k, v)| (k, orig(v))
487 })
488 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
489 .into();
490
491 KeyedSingleton::new(
492 self.location.clone(),
493 HydroNode::Map {
494 f: map_f,
495 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
496 metadata: self.location.new_node_metadata(KeyedSingleton::<
497 K,
498 U,
499 L,
500 B::EraseMonotonic,
501 >::collection_kind()),
502 },
503 )
504 }
505
506 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
507 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
508 ///
509 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
510 /// the new value `U`. The key remains unchanged in the output.
511 ///
512 /// # Example
513 /// ```rust
514 /// # #[cfg(feature = "deploy")] {
515 /// # use hydro_lang::prelude::*;
516 /// # use futures::StreamExt;
517 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
518 /// let keyed_singleton = // { 1: 2, 2: 4 }
519 /// # process
520 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
521 /// # .into_keyed()
522 /// # .first();
523 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
524 /// # .entries()
525 /// # }, |mut stream| async move {
526 /// // { 1: 3, 2: 6 }
527 /// # let mut results = Vec::new();
528 /// # for _ in 0..2 {
529 /// # results.push(stream.next().await.unwrap());
530 /// # }
531 /// # results.sort();
532 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
533 /// # }));
534 /// # }
535 /// ```
536 pub fn map_with_key<U, F>(
537 self,
538 f: impl IntoQuotedMut<'a, F, L> + Copy,
539 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
540 where
541 F: Fn((K, V)) -> U + 'a,
542 K: Clone,
543 {
544 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
545 let map_f = q!({
546 let orig = f;
547 move |(k, v)| {
548 let out = orig((Clone::clone(&k), v));
549 (k, out)
550 }
551 })
552 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
553 .into();
554
555 KeyedSingleton::new(
556 self.location.clone(),
557 HydroNode::Map {
558 f: map_f,
559 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
560 metadata: self.location.new_node_metadata(KeyedSingleton::<
561 K,
562 U,
563 L,
564 B::EraseMonotonic,
565 >::collection_kind()),
566 },
567 )
568 }
569
570 /// Gets the number of keys in the keyed singleton.
571 ///
572 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
573 /// since keys may be added / removed over time. When the set of keys changes, the count will
574 /// be asynchronously updated.
575 ///
576 /// # Example
577 /// ```rust
578 /// # #[cfg(feature = "deploy")] {
579 /// # use hydro_lang::prelude::*;
580 /// # use futures::StreamExt;
581 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
582 /// # let tick = process.tick();
583 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
584 /// # process
585 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
586 /// # .into_keyed()
587 /// # .batch(&tick, nondet!(/** test */))
588 /// # .first();
589 /// keyed_singleton.key_count()
590 /// # .all_ticks()
591 /// # }, |mut stream| async move {
592 /// // 3
593 /// # assert_eq!(stream.next().await.unwrap(), 3);
594 /// # }));
595 /// # }
596 /// ```
597 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
598 if B::ValueBound::BOUNDED {
599 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
600 location: self.location.clone(),
601 flow_state: self.flow_state.clone(),
602 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
603 _phantom: PhantomData,
604 };
605
606 me.entries().count().ignore_monotonic()
607 } else if L::is_top_level()
608 && let Some(tick) = self.location.try_tick()
609 && (B::bound_kind() == KeyedSingletonBoundKind::Unbounded
610 || B::bound_kind() == KeyedSingletonBoundKind::MonotonicKeys
611 || B::bound_kind() == KeyedSingletonBoundKind::MonotonicValue)
612 {
613 let location = self.location.clone();
614 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
615 let me: KeyedSingleton<K, V, L, MonotonicKeys> =
616 KeyedSingleton::new(location.clone(), ir_node);
617
618 let out =
619 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
620 .latest();
621 Singleton::new(location, out.ir_node.replace(HydroNode::Placeholder))
622 } else {
623 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
624 }
625 }
626
627 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
628 ///
629 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
630 /// asynchronously as well.
631 ///
632 /// # Example
633 /// ```rust
634 /// # #[cfg(feature = "deploy")] {
635 /// # use hydro_lang::prelude::*;
636 /// # use futures::StreamExt;
637 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
638 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
639 /// # process
640 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
641 /// # .into_keyed()
642 /// # .batch(&process.tick(), nondet!(/** test */))
643 /// # .first();
644 /// keyed_singleton.into_singleton()
645 /// # .all_ticks()
646 /// # }, |mut stream| async move {
647 /// // { 1: "a", 2: "b", 3: "c" }
648 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
649 /// # }));
650 /// # }
651 /// ```
652 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
653 where
654 K: Eq + Hash,
655 {
656 if B::ValueBound::BOUNDED {
657 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
658 location: self.location.clone(),
659 flow_state: self.flow_state.clone(),
660 ir_node: RefCell::new(self.ir_node.replace(HydroNode::Placeholder)),
661 _phantom: PhantomData,
662 };
663
664 me.entries()
665 .assume_ordering_trusted(nondet!(
666 /// There is only one element associated with each key. The closure technically
667 /// isn't commutative in the case where both passed entries have the same key
668 /// but different values.
669 ///
670 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
671 /// the key is never already present in the map.
672 ))
673 .fold(
674 q!(|| HashMap::new()),
675 q!(|map, (k, v)| {
676 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
677 map.insert(k, v);
678 }),
679 )
680 } else if L::is_top_level()
681 && let Some(tick) = self.location.try_tick()
682 && (B::bound_kind() == KeyedSingletonBoundKind::Unbounded
683 || B::bound_kind() == KeyedSingletonBoundKind::MonotonicKeys
684 || B::bound_kind() == KeyedSingletonBoundKind::MonotonicValue)
685 {
686 let location = self.location.clone();
687 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
688 let me: KeyedSingleton<K, V, L, MonotonicKeys> =
689 KeyedSingleton::new(location.clone(), ir_node);
690
691 let out = into_singleton_inside_tick(
692 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693 )
694 .latest();
695 Singleton::new(location, out.ir_node.replace(HydroNode::Placeholder))
696 } else {
697 panic!("BoundedValue or Unbounded KeyedSingleton inside a tick, not supported");
698 }
699 }
700
701 /// An operator which allows you to "name" a `HydroNode`.
702 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
703 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
704 {
705 let mut node = self.ir_node.borrow_mut();
706 let metadata = node.metadata_mut();
707 metadata.tag = Some(name.to_owned());
708 }
709 self
710 }
711
712 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
713 /// implies that `B == Bounded`.
714 pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
715 where
716 B: IsBounded,
717 {
718 KeyedSingleton::new(
719 self.location.clone(),
720 self.ir_node.replace(HydroNode::Placeholder),
721 )
722 }
723
724 /// Gets the value associated with a specific key from the keyed singleton.
725 /// Returns `None` if the key is `None` or there is no associated value.
726 ///
727 /// # Example
728 /// ```rust
729 /// # #[cfg(feature = "deploy")] {
730 /// # use hydro_lang::prelude::*;
731 /// # use futures::StreamExt;
732 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
733 /// let tick = process.tick();
734 /// let keyed_data = process
735 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
736 /// .into_keyed()
737 /// .batch(&tick, nondet!(/** test */))
738 /// .first();
739 /// let key = tick.singleton(q!(1));
740 /// keyed_data.get(key).all_ticks()
741 /// # }, |mut stream| async move {
742 /// // 2
743 /// # assert_eq!(stream.next().await.unwrap(), 2);
744 /// # }));
745 /// # }
746 /// ```
747 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
748 where
749 B: IsBounded,
750 K: Hash + Eq + Clone,
751 V: Clone,
752 {
753 self.make_bounded()
754 .into_keyed_stream()
755 .get(key)
756 .cast_at_most_one_element()
757 }
758
759 /// Emit a keyed stream containing keys shared between the keyed singleton and the
760 /// keyed stream, where each value in the output keyed stream is a tuple of
761 /// (the keyed singleton's value, the keyed stream's value).
762 ///
763 /// # Example
764 /// ```rust
765 /// # #[cfg(feature = "deploy")] {
766 /// # use hydro_lang::prelude::*;
767 /// # use futures::StreamExt;
768 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
769 /// let tick = process.tick();
770 /// let keyed_data = process
771 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
772 /// .into_keyed()
773 /// .batch(&tick, nondet!(/** test */))
774 /// .first();
775 /// let other_data = process
776 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
777 /// .into_keyed()
778 /// .batch(&tick, nondet!(/** test */));
779 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
780 /// # }, |mut stream| async move {
781 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
782 /// # let mut results = vec![];
783 /// # for _ in 0..3 {
784 /// # results.push(stream.next().await.unwrap());
785 /// # }
786 /// # results.sort();
787 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
788 /// # }));
789 /// # }
790 /// ```
791 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2, B2: Boundedness>(
792 self,
793 other: KeyedStream<K, V2, L, B2, O2, R2>,
794 ) -> KeyedStream<K, (V, V2), L, B2, O2, R2>
795 where
796 B: IsBounded,
797 K: Eq + Hash + Clone,
798 V: Clone,
799 V2: Clone,
800 {
801 // TODO(shadaj): if DFIR guarantees that joining unbounded keyed stream x bounded keyed stream
802 // always produces deterministic order per key (nested loop join), this could just use
803 // `join_keyed_stream` without constructing IRs manually
804 KeyedStream::new(
805 self.location.clone(),
806 HydroNode::Join {
807 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
808 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
809 metadata: self
810 .location
811 .new_node_metadata(KeyedStream::<K, (V, V2), L, B2, O2, R2>::collection_kind()),
812 },
813 )
814 }
815
816 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
817 /// where each value in the output keyed singleton is a tuple of
818 /// (self.value, other.value).
819 ///
820 /// # Example
821 /// ```rust
822 /// # #[cfg(feature = "deploy")] {
823 /// # use hydro_lang::prelude::*;
824 /// # use futures::StreamExt;
825 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
826 /// # let tick = process.tick();
827 /// let requests = // { 1: 10, 2: 20, 3: 30 }
828 /// # process
829 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
830 /// # .into_keyed()
831 /// # .batch(&tick, nondet!(/** test */))
832 /// # .first();
833 /// let other = // { 1: 100, 2: 200, 4: 400 }
834 /// # process
835 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
836 /// # .into_keyed()
837 /// # .batch(&tick, nondet!(/** test */))
838 /// # .first();
839 /// requests.join_keyed_singleton(other)
840 /// # .entries().all_ticks()
841 /// # }, |mut stream| async move {
842 /// // { 1: (10, 100), 2: (20, 200) }
843 /// # let mut results = vec![];
844 /// # for _ in 0..2 {
845 /// # results.push(stream.next().await.unwrap());
846 /// # }
847 /// # results.sort();
848 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
849 /// # }));
850 /// # }
851 /// ```
852 pub fn join_keyed_singleton<V2: Clone>(
853 self,
854 other: KeyedSingleton<K, V2, L, Bounded>,
855 ) -> KeyedSingleton<K, (V, V2), L, Bounded>
856 where
857 B: IsBounded,
858 K: Eq + Hash + Clone,
859 V: Clone,
860 {
861 let result_stream = self
862 .make_bounded()
863 .entries()
864 .join(other.entries())
865 .into_keyed();
866
867 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
868 result_stream.cast_at_most_one_entry_per_key()
869 }
870
871 /// For each value in `self`, find the matching key in `lookup`.
872 /// The output is a keyed singleton with the key from `self`, and a value
873 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
874 /// If the key is not present in `lookup`, the option will be [`None`].
875 ///
876 /// # Example
877 /// ```rust
878 /// # #[cfg(feature = "deploy")] {
879 /// # use hydro_lang::prelude::*;
880 /// # use futures::StreamExt;
881 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
882 /// # let tick = process.tick();
883 /// let requests = // { 1: 10, 2: 20 }
884 /// # process
885 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
886 /// # .into_keyed()
887 /// # .batch(&tick, nondet!(/** test */))
888 /// # .first();
889 /// let other_data = // { 10: 100, 11: 110 }
890 /// # process
891 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
892 /// # .into_keyed()
893 /// # .batch(&tick, nondet!(/** test */))
894 /// # .first();
895 /// requests.lookup_keyed_singleton(other_data)
896 /// # .entries().all_ticks()
897 /// # }, |mut stream| async move {
898 /// // { 1: (10, Some(100)), 2: (20, None) }
899 /// # let mut results = vec![];
900 /// # for _ in 0..2 {
901 /// # results.push(stream.next().await.unwrap());
902 /// # }
903 /// # results.sort();
904 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
905 /// # }));
906 /// # }
907 /// ```
908 pub fn lookup_keyed_singleton<V2>(
909 self,
910 lookup: KeyedSingleton<V, V2, L, Bounded>,
911 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
912 where
913 B: IsBounded,
914 K: Eq + Hash + Clone,
915 V: Eq + Hash + Clone,
916 V2: Clone,
917 {
918 let result_stream = self
919 .make_bounded()
920 .into_keyed_stream()
921 .lookup_keyed_stream(lookup.into_keyed_stream());
922
923 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
924 result_stream.cast_at_most_one_entry_per_key()
925 }
926
927 /// For each value in `self`, find the matching key in `lookup`.
928 /// The output is a keyed stream with the key from `self`, and a value
929 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
930 /// If the key is not present in `lookup`, the option will be [`None`].
931 ///
932 /// # Example
933 /// ```rust
934 /// # #[cfg(feature = "deploy")] {
935 /// # use hydro_lang::prelude::*;
936 /// # use futures::StreamExt;
937 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
938 /// # let tick = process.tick();
939 /// let requests = // { 1: 10, 2: 20 }
940 /// # process
941 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
942 /// # .into_keyed()
943 /// # .batch(&tick, nondet!(/** test */))
944 /// # .first();
945 /// let other_data = // { 10: 100, 10: 110 }
946 /// # process
947 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
948 /// # .into_keyed()
949 /// # .batch(&tick, nondet!(/** test */));
950 /// requests.lookup_keyed_stream(other_data)
951 /// # .entries().all_ticks()
952 /// # }, |mut stream| async move {
953 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
954 /// # let mut results = vec![];
955 /// # for _ in 0..3 {
956 /// # results.push(stream.next().await.unwrap());
957 /// # }
958 /// # results.sort();
959 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
960 /// # }));
961 /// # }
962 /// ```
963 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
964 self,
965 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
966 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
967 where
968 B: IsBounded,
969 K: Eq + Hash + Clone,
970 V: Eq + Hash + Clone,
971 V2: Clone,
972 {
973 self.make_bounded()
974 .entries()
975 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
976 .into_keyed()
977 .lookup_keyed_stream(lookup)
978 }
979
980 /// For each key present in both `self` and `thresholds`, emits a [`KeyedStream`] event the first
981 /// time that key's value becomes greater than or equal to the corresponding threshold value.
982 /// The emitted value for each key is the threshold value itself.
983 ///
984 /// This requires the keyed singleton to have monotonic values ([`MonotonicValue`] or [`Bounded`]),
985 /// because otherwise the threshold detection would be non-deterministic.
986 ///
987 /// The `thresholds` parameter is a [`BoundedValue`] keyed singleton mapping each key to its
988 /// threshold. Thresholds may arrive asynchronously (new keys appear over time), but once set
989 /// for a key, the threshold value is fixed. Late-arriving thresholds are checked against the
990 /// current snapshot value immediately.
991 ///
992 /// # Example
993 /// ```rust,ignore
994 /// use hydro_lang::prelude::*;
995 ///
996 /// // Given a monotonically increasing keyed singleton (e.g. from fold with monotone proof)
997 /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
998 /// .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 is monotone */)));
999 ///
1000 /// // BoundedValue keyed singleton of thresholds (from .first())
1001 /// let thresholds = threshold_source.into_keyed().first();
1002 ///
1003 /// // Emits (key, threshold_value) the first time each key's value >= threshold
1004 /// let crossed = counts.threshold_greater_or_equal(thresholds);
1005 /// ```
1006 pub fn threshold_greater_or_equal(
1007 self,
1008 thresholds: KeyedSingleton<K, V, L, BoundedValue>,
1009 ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1010 where
1011 K: Clone + Eq + Hash,
1012 V: Clone + PartialOrd,
1013 B: IsKeyedMonotonic,
1014 {
1015 let self_location = self.location.clone();
1016 match B::bound_kind() {
1017 KeyedSingletonBoundKind::Bounded => {
1018 // Bounded case: self is already fixed, just join and filter
1019 let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1020 self.location.clone(),
1021 self.ir_node.replace(HydroNode::Placeholder),
1022 );
1023 let result = me
1024 .entries()
1025 .join(thresholds.entries())
1026 .filter_map(q!(|(k, (val, thresh))| {
1027 if val >= thresh {
1028 Some((k, thresh))
1029 } else {
1030 None
1031 }
1032 }))
1033 .into_keyed();
1034 KeyedStream::new(
1035 result.location.clone(),
1036 result.ir_node.replace(HydroNode::Placeholder),
1037 )
1038 }
1039 KeyedSingletonBoundKind::MonotonicValue => {
1040 let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1041 self.location.clone(),
1042 self.ir_node.replace(HydroNode::Placeholder),
1043 );
1044
1045 let result = sliced! {
1046 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1047 let thresh_snapshot =
1048 use(thresholds, nondet!(/** thresholds are deterministic */));
1049 let mut already_crossed =
1050 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1051
1052 let joined = thresh_snapshot.entries().join(snapshot.entries());
1053 let passed = joined
1054 .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1055 .map(q!(|(k, (thresh, _))| (k, thresh)));
1056
1057 let newly_crossed = passed.anti_join(already_crossed.clone());
1058 already_crossed =
1059 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1060
1061 newly_crossed.into_keyed()
1062 };
1063
1064 KeyedStream::new(
1065 self_location,
1066 result.ir_node.replace(HydroNode::Placeholder),
1067 )
1068 }
1069 KeyedSingletonBoundKind::BoundedValue => {
1070 let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1071 self.location.clone(),
1072 self.ir_node.replace(HydroNode::Placeholder),
1073 );
1074
1075 let result = sliced! {
1076 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1077 let thresh_snapshot =
1078 use(thresholds, nondet!(/** thresholds are deterministic */));
1079 let mut already_crossed =
1080 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1081
1082 let joined = thresh_snapshot.entries().join(snapshot.entries());
1083 let passed = joined
1084 .filter(q!(|(_, (thresh, val))| *val >= *thresh))
1085 .map(q!(|(k, (thresh, _))| (k, thresh)));
1086
1087 let newly_crossed = passed.anti_join(already_crossed.clone());
1088 already_crossed =
1089 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1090
1091 newly_crossed.into_keyed()
1092 };
1093
1094 KeyedStream::new(
1095 self_location,
1096 result.ir_node.replace(HydroNode::Placeholder),
1097 )
1098 }
1099 _ => {
1100 unreachable!(
1101 "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1102 )
1103 }
1104 }
1105 }
1106
1107 /// Like [`Self::threshold_greater_or_equal`], but uses a single [`Singleton`] threshold
1108 /// shared across all keys. Emits a `(K, V)` event for each key the first time that key's
1109 /// value becomes >= the threshold. The emitted value is the threshold itself.
1110 ///
1111 /// Because the threshold is a [`Bounded`] singleton, it is a compile-time constant and
1112 /// does not carry ongoing memory cost.
1113 ///
1114 /// # Example
1115 /// ```rust,ignore
1116 /// use hydro_lang::prelude::*;
1117 ///
1118 /// let counts: KeyedSingleton<u32, usize, _, MonotonicValue> = events.into_keyed()
1119 /// .fold(q!(|| 0), q!(|acc, _| *acc += 1, monotone = manual_proof!(/** +1 */)));
1120 ///
1121 /// let threshold = process.singleton(q!(5usize));
1122 /// let crossed = counts.threshold_greater_or_equal_uniform(threshold);
1123 /// ```
1124 pub fn threshold_greater_or_equal_uniform(
1125 self,
1126 threshold: Singleton<V, L, Bounded>,
1127 ) -> KeyedStream<K, V, L, B::UnderlyingBound, NoOrder, ExactlyOnce>
1128 where
1129 K: Clone + Eq + Hash,
1130 V: Clone + PartialOrd,
1131 B: IsKeyedMonotonic,
1132 {
1133 let self_location = self.location.clone();
1134 match B::bound_kind() {
1135 KeyedSingletonBoundKind::Bounded => {
1136 let me: KeyedSingleton<K, V, L, Bounded> = KeyedSingleton::new(
1137 self.location.clone(),
1138 self.ir_node.replace(HydroNode::Placeholder),
1139 );
1140 let result = me
1141 .entries()
1142 .cross_singleton(threshold)
1143 .filter_map(q!(|((k, val), thresh)| {
1144 if val >= thresh {
1145 Some((k, thresh))
1146 } else {
1147 None
1148 }
1149 }))
1150 .into_keyed();
1151 KeyedStream::new(
1152 result.location.clone(),
1153 result.ir_node.replace(HydroNode::Placeholder),
1154 )
1155 }
1156 KeyedSingletonBoundKind::MonotonicValue => {
1157 let me: KeyedSingleton<K, V, L, MonotonicValue> = KeyedSingleton::new(
1158 self.location.clone(),
1159 self.ir_node.replace(HydroNode::Placeholder),
1160 );
1161
1162 let result = sliced! {
1163 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1164 let mut already_crossed =
1165 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1166
1167 let tick = snapshot.location().clone();
1168 let thresh_in_tick = threshold.clone_into_tick(&tick);
1169
1170 let crossing = snapshot
1171 .entries()
1172 .cross_singleton(thresh_in_tick)
1173 .filter_map(q!(|((k, val), thresh)| {
1174 if val >= thresh {
1175 Some((k, thresh))
1176 } else {
1177 None
1178 }
1179 }));
1180
1181 let newly_crossed = crossing.anti_join(already_crossed.clone());
1182 already_crossed =
1183 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1184
1185 newly_crossed.into_keyed()
1186 };
1187
1188 KeyedStream::new(
1189 self_location,
1190 result.ir_node.replace(HydroNode::Placeholder),
1191 )
1192 }
1193 KeyedSingletonBoundKind::BoundedValue => {
1194 let me: KeyedSingleton<K, V, L, BoundedValue> = KeyedSingleton::new(
1195 self.location.clone(),
1196 self.ir_node.replace(HydroNode::Placeholder),
1197 );
1198
1199 let result = sliced! {
1200 let snapshot = use(me, nondet!(/** thresholds are deterministic */));
1201 let mut already_crossed =
1202 use::state_null::<Stream<K, Tick<_>, Bounded, NoOrder>>();
1203
1204 let tick = snapshot.location().clone();
1205 let thresh_in_tick = threshold.clone_into_tick(&tick);
1206
1207 let crossing = snapshot
1208 .entries()
1209 .cross_singleton(thresh_in_tick)
1210 .filter_map(q!(|((k, val), thresh)| {
1211 if val >= thresh {
1212 Some((k, thresh))
1213 } else {
1214 None
1215 }
1216 }));
1217
1218 let newly_crossed = crossing.anti_join(already_crossed.clone());
1219 already_crossed =
1220 already_crossed.chain(newly_crossed.clone().map(q!(|(k, _)| k)));
1221
1222 newly_crossed.into_keyed()
1223 };
1224
1225 KeyedStream::new(
1226 self_location,
1227 result.ir_node.replace(HydroNode::Placeholder),
1228 )
1229 }
1230 _ => {
1231 unreachable!(
1232 "IsKeyedMonotonic is only implemented for Bounded, BoundedValue, and MonotonicValue"
1233 )
1234 }
1235 }
1236 }
1237}
1238
1239impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
1240 KeyedSingleton<K, V, L, B>
1241{
1242 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
1243 ///
1244 /// The value for each key must be bounded, otherwise the resulting stream elements would be
1245 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1246 /// into the output.
1247 ///
1248 /// # Example
1249 /// ```rust
1250 /// # #[cfg(feature = "deploy")] {
1251 /// # use hydro_lang::prelude::*;
1252 /// # use futures::StreamExt;
1253 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1254 /// let keyed_singleton = // { 1: 2, 2: 4 }
1255 /// # process
1256 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1257 /// # .into_keyed()
1258 /// # .first();
1259 /// keyed_singleton.entries()
1260 /// # }, |mut stream| async move {
1261 /// // (1, 2), (2, 4) in any order
1262 /// # let mut results = Vec::new();
1263 /// # for _ in 0..2 {
1264 /// # results.push(stream.next().await.unwrap());
1265 /// # }
1266 /// # results.sort();
1267 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1268 /// # }));
1269 /// # }
1270 /// ```
1271 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1272 self.into_keyed_stream().entries()
1273 }
1274
1275 /// Flattens the keyed singleton into an unordered stream of just the values.
1276 ///
1277 /// The value for each key must be bounded, otherwise the resulting stream elements would be
1278 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
1279 /// into the output.
1280 ///
1281 /// # Example
1282 /// ```rust
1283 /// # #[cfg(feature = "deploy")] {
1284 /// # use hydro_lang::prelude::*;
1285 /// # use futures::StreamExt;
1286 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1287 /// let keyed_singleton = // { 1: 2, 2: 4 }
1288 /// # process
1289 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1290 /// # .into_keyed()
1291 /// # .first();
1292 /// keyed_singleton.values()
1293 /// # }, |mut stream| async move {
1294 /// // 2, 4 in any order
1295 /// # let mut results = Vec::new();
1296 /// # for _ in 0..2 {
1297 /// # results.push(stream.next().await.unwrap());
1298 /// # }
1299 /// # results.sort();
1300 /// # assert_eq!(results, vec![2, 4]);
1301 /// # }));
1302 /// # }
1303 /// ```
1304 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1305 let map_f = q!(|(_, v)| v)
1306 .splice_fn1_ctx::<(K, V), V>(&self.location)
1307 .into();
1308
1309 Stream::new(
1310 self.location.clone(),
1311 HydroNode::Map {
1312 f: map_f,
1313 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1314 metadata: self.location.new_node_metadata(Stream::<
1315 V,
1316 L,
1317 B::UnderlyingBound,
1318 NoOrder,
1319 ExactlyOnce,
1320 >::collection_kind()),
1321 },
1322 )
1323 }
1324
1325 /// Flattens the keyed singleton into an unordered stream of just the keys.
1326 ///
1327 /// The value for each key must be bounded, otherwise the removal of keys would result in
1328 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
1329 /// into the output.
1330 ///
1331 /// # Example
1332 /// ```rust
1333 /// # #[cfg(feature = "deploy")] {
1334 /// # use hydro_lang::prelude::*;
1335 /// # use futures::StreamExt;
1336 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1337 /// let keyed_singleton = // { 1: 2, 2: 4 }
1338 /// # process
1339 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1340 /// # .into_keyed()
1341 /// # .first();
1342 /// keyed_singleton.keys()
1343 /// # }, |mut stream| async move {
1344 /// // 1, 2 in any order
1345 /// # let mut results = Vec::new();
1346 /// # for _ in 0..2 {
1347 /// # results.push(stream.next().await.unwrap());
1348 /// # }
1349 /// # results.sort();
1350 /// # assert_eq!(results, vec![1, 2]);
1351 /// # }));
1352 /// # }
1353 /// ```
1354 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
1355 self.entries().map(q!(|(k, _)| k))
1356 }
1357
1358 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
1359 /// entries whose keys are not in the provided stream.
1360 ///
1361 /// # Example
1362 /// ```rust
1363 /// # #[cfg(feature = "deploy")] {
1364 /// # use hydro_lang::prelude::*;
1365 /// # use futures::StreamExt;
1366 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1367 /// let tick = process.tick();
1368 /// let keyed_singleton = // { 1: 2, 2: 4 }
1369 /// # process
1370 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1371 /// # .into_keyed()
1372 /// # .first()
1373 /// # .batch(&tick, nondet!(/** test */));
1374 /// let keys_to_remove = process
1375 /// .source_iter(q!(vec![1]))
1376 /// .batch(&tick, nondet!(/** test */));
1377 /// keyed_singleton.filter_key_not_in(keys_to_remove)
1378 /// # .entries().all_ticks()
1379 /// # }, |mut stream| async move {
1380 /// // { 2: 4 }
1381 /// # for w in vec![(2, 4)] {
1382 /// # assert_eq!(stream.next().await.unwrap(), w);
1383 /// # }
1384 /// # }));
1385 /// # }
1386 /// ```
1387 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1388 self,
1389 other: Stream<K, L, Bounded, O2, R2>,
1390 ) -> Self
1391 where
1392 K: Hash + Eq,
1393 {
1394 check_matching_location(&self.location, &other.location);
1395
1396 KeyedSingleton::new(
1397 self.location.clone(),
1398 HydroNode::AntiJoin {
1399 pos: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1400 neg: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
1401 metadata: self.location.new_node_metadata(Self::collection_kind()),
1402 },
1403 )
1404 }
1405
1406 /// An operator which allows you to "inspect" each value of a keyed singleton without
1407 /// modifying it. The closure `f` is called on a reference to each value. This is
1408 /// mainly useful for debugging, and should not be used to generate side-effects.
1409 ///
1410 /// # Example
1411 /// ```rust
1412 /// # #[cfg(feature = "deploy")] {
1413 /// # use hydro_lang::prelude::*;
1414 /// # use futures::StreamExt;
1415 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1416 /// let keyed_singleton = // { 1: 2, 2: 4 }
1417 /// # process
1418 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1419 /// # .into_keyed()
1420 /// # .first();
1421 /// keyed_singleton
1422 /// .inspect(q!(|v| println!("{}", v)))
1423 /// # .entries()
1424 /// # }, |mut stream| async move {
1425 /// // { 1: 2, 2: 4 }
1426 /// # for w in vec![(1, 2), (2, 4)] {
1427 /// # assert_eq!(stream.next().await.unwrap(), w);
1428 /// # }
1429 /// # }));
1430 /// # }
1431 /// ```
1432 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1433 where
1434 F: Fn(&V) + 'a,
1435 {
1436 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1437 let inspect_f = q!({
1438 let orig = f;
1439 move |t: &(_, _)| orig(&t.1)
1440 })
1441 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1442 .into();
1443
1444 KeyedSingleton::new(
1445 self.location.clone(),
1446 HydroNode::Inspect {
1447 f: inspect_f,
1448 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1449 metadata: self.location.new_node_metadata(Self::collection_kind()),
1450 },
1451 )
1452 }
1453
1454 /// An operator which allows you to "inspect" each entry of a keyed singleton without
1455 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1456 /// mainly useful for debugging, and should not be used to generate side-effects.
1457 ///
1458 /// # Example
1459 /// ```rust
1460 /// # #[cfg(feature = "deploy")] {
1461 /// # use hydro_lang::prelude::*;
1462 /// # use futures::StreamExt;
1463 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1464 /// let keyed_singleton = // { 1: 2, 2: 4 }
1465 /// # process
1466 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1467 /// # .into_keyed()
1468 /// # .first();
1469 /// keyed_singleton
1470 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1471 /// # .entries()
1472 /// # }, |mut stream| async move {
1473 /// // { 1: 2, 2: 4 }
1474 /// # for w in vec![(1, 2), (2, 4)] {
1475 /// # assert_eq!(stream.next().await.unwrap(), w);
1476 /// # }
1477 /// # }));
1478 /// # }
1479 /// ```
1480 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1481 where
1482 F: Fn(&(K, V)) + 'a,
1483 {
1484 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1485
1486 KeyedSingleton::new(
1487 self.location.clone(),
1488 HydroNode::Inspect {
1489 f: inspect_f,
1490 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1491 metadata: self.location.new_node_metadata(Self::collection_kind()),
1492 },
1493 )
1494 }
1495
1496 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1497 ///
1498 /// Because this method requires values to be bounded, the output [`Optional`] will only be
1499 /// asynchronously updated if a new key is added that is higher than the previous max key.
1500 ///
1501 /// # Example
1502 /// ```rust
1503 /// # #[cfg(feature = "deploy")] {
1504 /// # use hydro_lang::prelude::*;
1505 /// # use futures::StreamExt;
1506 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1507 /// let tick = process.tick();
1508 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1509 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1510 /// # .into_keyed()
1511 /// # .first();
1512 /// keyed_singleton.get_max_key()
1513 /// # .sample_eager(nondet!(/** test */))
1514 /// # }, |mut stream| async move {
1515 /// // (2, 456)
1516 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1517 /// # }));
1518 /// # }
1519 /// ```
1520 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1521 where
1522 K: Ord,
1523 {
1524 self.entries()
1525 .assume_ordering_trusted(nondet!(
1526 /// There is only one element associated with each key, and the keys are totallly
1527 /// ordered so we will produce a deterministic value. The closure technically
1528 /// isn't commutative in the case where both passed entries have the same key
1529 /// but different values.
1530 ///
1531 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1532 /// the two inputs do not have the same key.
1533 ))
1534 .reduce(q!(
1535 move |curr, new| {
1536 if new.0 > curr.0 {
1537 *curr = new;
1538 }
1539 },
1540 idempotent = manual_proof!(/** repeated elements are ignored */)
1541 ))
1542 }
1543
1544 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1545 /// element, the value.
1546 ///
1547 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1548 ///
1549 /// # Example
1550 /// ```rust
1551 /// # #[cfg(feature = "deploy")] {
1552 /// # use hydro_lang::prelude::*;
1553 /// # use futures::StreamExt;
1554 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1555 /// let keyed_singleton = // { 1: 2, 2: 4 }
1556 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1557 /// # .into_keyed()
1558 /// # .first();
1559 /// keyed_singleton
1560 /// .clone()
1561 /// .into_keyed_stream()
1562 /// .merge_unordered(
1563 /// keyed_singleton.into_keyed_stream()
1564 /// )
1565 /// # .entries()
1566 /// # }, |mut stream| async move {
1567 /// /// // { 1: [2, 2], 2: [4, 4] }
1568 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1569 /// # assert_eq!(stream.next().await.unwrap(), w);
1570 /// # }
1571 /// # }));
1572 /// # }
1573 /// ```
1574 pub fn into_keyed_stream(
1575 self,
1576 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1577 KeyedStream::new(
1578 self.location.clone(),
1579 HydroNode::Cast {
1580 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1581 metadata: self.location.new_node_metadata(KeyedStream::<
1582 K,
1583 V,
1584 L,
1585 B::UnderlyingBound,
1586 TotalOrder,
1587 ExactlyOnce,
1588 >::collection_kind()),
1589 },
1590 )
1591 }
1592}
1593
1594impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1595where
1596 L: Location<'a>,
1597{
1598 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1599 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1600 ///
1601 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1602 /// processed before an acknowledgement is emitted.
1603 pub fn atomic(self) -> KeyedSingleton<K, V, Atomic<L>, B> {
1604 let id = self.location.flow_state().borrow_mut().next_clock_id();
1605 let out_location = Atomic {
1606 tick: Tick {
1607 id,
1608 l: self.location.clone(),
1609 },
1610 };
1611 KeyedSingleton::new(
1612 out_location.clone(),
1613 HydroNode::BeginAtomic {
1614 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1615 metadata: out_location
1616 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1617 },
1618 )
1619 }
1620}
1621
1622impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1623where
1624 L: Location<'a>,
1625{
1626 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1627 /// See [`KeyedSingleton::atomic`] for more details.
1628 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1629 KeyedSingleton::new(
1630 self.location.tick.l.clone(),
1631 HydroNode::EndAtomic {
1632 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1633 metadata: self
1634 .location
1635 .tick
1636 .l
1637 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1638 },
1639 )
1640 }
1641}
1642
1643impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1644 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1645 /// tick `T` always has the entries of `self` at tick `T - 1`.
1646 ///
1647 /// At tick `0`, the output has no entries, since there is no previous tick.
1648 ///
1649 /// This operator enables stateful iterative processing with ticks, by sending data from one
1650 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1651 ///
1652 /// # Example
1653 /// ```rust
1654 /// # #[cfg(feature = "deploy")] {
1655 /// # use hydro_lang::prelude::*;
1656 /// # use futures::StreamExt;
1657 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1658 /// let tick = process.tick();
1659 /// # // ticks are lazy by default, forces the second tick to run
1660 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1661 /// # let batch_first_tick = process
1662 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1663 /// # .batch(&tick, nondet!(/** test */))
1664 /// # .into_keyed();
1665 /// # let batch_second_tick = process
1666 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1667 /// # .batch(&tick, nondet!(/** test */))
1668 /// # .into_keyed()
1669 /// # .defer_tick(); // appears on the second tick
1670 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1671 /// # batch_first_tick.chain(batch_second_tick).first();
1672 /// input_batch.clone().filter_key_not_in(
1673 /// input_batch.defer_tick().keys() // keys present in the previous tick
1674 /// )
1675 /// # .entries().all_ticks()
1676 /// # }, |mut stream| async move {
1677 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1678 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1679 /// # assert_eq!(stream.next().await.unwrap(), w);
1680 /// # }
1681 /// # }));
1682 /// # }
1683 /// ```
1684 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1685 KeyedSingleton::new(
1686 self.location.clone(),
1687 HydroNode::DeferTick {
1688 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1689 metadata: self
1690 .location
1691 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1692 },
1693 )
1694 }
1695}
1696
1697impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1698where
1699 L: Location<'a>,
1700{
1701 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1702 /// point in time.
1703 ///
1704 /// # Non-Determinism
1705 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1706 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1707 pub fn snapshot<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1708 self,
1709 tick: &Tick<L2>,
1710 _nondet: NonDet,
1711 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1712 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1713 KeyedSingleton::new(
1714 tick.drop_consistency(),
1715 HydroNode::Batch {
1716 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1717 metadata: tick
1718 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1719 },
1720 )
1721 }
1722}
1723
1724impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1725where
1726 L: Location<'a>,
1727{
1728 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1729 /// state of the keyed singleton being atomically processed.
1730 ///
1731 /// # Non-Determinism
1732 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1733 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1734 pub fn snapshot_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1735 self,
1736 tick: &Tick<L2>,
1737 _nondet: NonDet,
1738 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1739 KeyedSingleton::new(
1740 tick.drop_consistency(),
1741 HydroNode::Batch {
1742 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1743 metadata: tick
1744 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1745 },
1746 )
1747 }
1748}
1749
1750impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1751where
1752 L: Location<'a>,
1753{
1754 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1755 ///
1756 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1757 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1758 /// is filtered out.
1759 ///
1760 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1761 /// not modify or take ownership of the values. If you need to modify the values while filtering
1762 /// use [`KeyedSingleton::filter_map`] instead.
1763 ///
1764 /// # Example
1765 /// ```rust
1766 /// # #[cfg(feature = "deploy")] {
1767 /// # use hydro_lang::prelude::*;
1768 /// # use futures::StreamExt;
1769 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1770 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1771 /// # process
1772 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1773 /// # .into_keyed()
1774 /// # .first();
1775 /// keyed_singleton.filter(q!(|&v| v > 1))
1776 /// # .entries()
1777 /// # }, |mut stream| async move {
1778 /// // { 1: 2, 2: 4 }
1779 /// # let mut results = Vec::new();
1780 /// # for _ in 0..2 {
1781 /// # results.push(stream.next().await.unwrap());
1782 /// # }
1783 /// # results.sort();
1784 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1785 /// # }));
1786 /// # }
1787 /// ```
1788 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1789 where
1790 F: Fn(&V) -> bool + 'a,
1791 {
1792 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1793 let filter_f = q!({
1794 let orig = f;
1795 move |t: &(_, _)| orig(&t.1)
1796 })
1797 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1798 .into();
1799
1800 KeyedSingleton::new(
1801 self.location.clone(),
1802 HydroNode::Filter {
1803 f: filter_f,
1804 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1805 metadata: self
1806 .location
1807 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1808 },
1809 )
1810 }
1811
1812 /// An operator that both filters and maps values. It yields only the key-value pairs where
1813 /// the supplied closure `f` returns `Some(value)`.
1814 ///
1815 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1816 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1817 /// If it returns `None`, the key-value pair is filtered out.
1818 ///
1819 /// # Example
1820 /// ```rust
1821 /// # #[cfg(feature = "deploy")] {
1822 /// # use hydro_lang::prelude::*;
1823 /// # use futures::StreamExt;
1824 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1825 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1826 /// # process
1827 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1828 /// # .into_keyed()
1829 /// # .first();
1830 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1831 /// # .entries()
1832 /// # }, |mut stream| async move {
1833 /// // { 1: 42, 3: 100 }
1834 /// # let mut results = Vec::new();
1835 /// # for _ in 0..2 {
1836 /// # results.push(stream.next().await.unwrap());
1837 /// # }
1838 /// # results.sort();
1839 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1840 /// # }));
1841 /// # }
1842 /// ```
1843 pub fn filter_map<F, U>(
1844 self,
1845 f: impl IntoQuotedMut<'a, F, L> + Copy,
1846 ) -> KeyedSingleton<K, U, L, B::EraseMonotonic>
1847 where
1848 F: Fn(V) -> Option<U> + 'a,
1849 {
1850 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1851 let filter_map_f = q!({
1852 let orig = f;
1853 move |(k, v)| orig(v).map(|o| (k, o))
1854 })
1855 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1856 .into();
1857
1858 KeyedSingleton::new(
1859 self.location.clone(),
1860 HydroNode::FilterMap {
1861 f: filter_map_f,
1862 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1863 metadata: self.location.new_node_metadata(KeyedSingleton::<
1864 K,
1865 U,
1866 L,
1867 B::EraseMonotonic,
1868 >::collection_kind()),
1869 },
1870 )
1871 }
1872
1873 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1874 /// arrived since the previous batch was released.
1875 ///
1876 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1877 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1878 ///
1879 /// # Non-Determinism
1880 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1881 /// has a non-deterministic set of key-value pairs.
1882 pub fn batch<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1883 self,
1884 tick: &Tick<L2>,
1885 _nondet: NonDet,
1886 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1887 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1888 KeyedSingleton::new(
1889 tick.drop_consistency(),
1890 HydroNode::Batch {
1891 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1892 metadata: tick
1893 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1894 },
1895 )
1896 }
1897}
1898
1899impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1900where
1901 L: Location<'a>,
1902{
1903 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1904 /// atomically processed.
1905 ///
1906 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1907 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1908 ///
1909 /// # Non-Determinism
1910 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1911 /// has a non-deterministic set of key-value pairs.
1912 pub fn batch_atomic<L2: Location<'a, DropConsistency = L::DropConsistency>>(
1913 self,
1914 tick: &Tick<L2>,
1915 nondet: NonDet,
1916 ) -> KeyedSingleton<K, V, Tick<L::DropConsistency>, Bounded> {
1917 let _ = nondet;
1918 KeyedSingleton::new(
1919 tick.drop_consistency(),
1920 HydroNode::Batch {
1921 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1922 metadata: tick
1923 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1924 },
1925 )
1926 }
1927}
1928
1929#[cfg(test)]
1930mod tests {
1931 #[cfg(feature = "deploy")]
1932 use futures::{SinkExt, StreamExt};
1933 #[cfg(feature = "deploy")]
1934 use hydro_deploy::Deployment;
1935 #[cfg(any(feature = "deploy", feature = "sim"))]
1936 use stageleft::q;
1937
1938 #[cfg(any(feature = "deploy", feature = "sim"))]
1939 use crate::compile::builder::FlowBuilder;
1940 #[cfg(any(feature = "deploy", feature = "sim"))]
1941 use crate::location::Location;
1942 #[cfg(any(feature = "deploy", feature = "sim"))]
1943 use crate::nondet::nondet;
1944
1945 #[cfg(feature = "deploy")]
1946 #[tokio::test]
1947 async fn key_count_bounded_value() {
1948 let mut deployment = Deployment::new();
1949
1950 let mut flow = FlowBuilder::new();
1951 let node = flow.process::<()>();
1952 let external = flow.external::<()>();
1953
1954 let (input_port, input) = node.source_external_bincode(&external);
1955 let out = input
1956 .into_keyed()
1957 .first()
1958 .key_count()
1959 .sample_eager(nondet!(/** test */))
1960 .send_bincode_external(&external);
1961
1962 let nodes = flow
1963 .with_process(&node, deployment.Localhost())
1964 .with_external(&external, deployment.Localhost())
1965 .deploy(&mut deployment);
1966
1967 deployment.deploy().await.unwrap();
1968
1969 let mut external_in = nodes.connect(input_port).await;
1970 let mut external_out = nodes.connect(out).await;
1971
1972 deployment.start().await.unwrap();
1973
1974 assert_eq!(external_out.next().await.unwrap(), 0);
1975
1976 external_in.send((1, 1)).await.unwrap();
1977 assert_eq!(external_out.next().await.unwrap(), 1);
1978
1979 external_in.send((2, 2)).await.unwrap();
1980 assert_eq!(external_out.next().await.unwrap(), 2);
1981 }
1982
1983 #[cfg(feature = "deploy")]
1984 #[tokio::test]
1985 async fn key_count_unbounded_value() {
1986 let mut deployment = Deployment::new();
1987
1988 let mut flow = FlowBuilder::new();
1989 let node = flow.process::<()>();
1990 let external = flow.external::<()>();
1991
1992 let (input_port, input) = node.source_external_bincode(&external);
1993 let out = input
1994 .into_keyed()
1995 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1996 .key_count()
1997 .sample_eager(nondet!(/** test */))
1998 .send_bincode_external(&external);
1999
2000 let nodes = flow
2001 .with_process(&node, deployment.Localhost())
2002 .with_external(&external, deployment.Localhost())
2003 .deploy(&mut deployment);
2004
2005 deployment.deploy().await.unwrap();
2006
2007 let mut external_in = nodes.connect(input_port).await;
2008 let mut external_out = nodes.connect(out).await;
2009
2010 deployment.start().await.unwrap();
2011
2012 assert_eq!(external_out.next().await.unwrap(), 0);
2013
2014 external_in.send((1, 1)).await.unwrap();
2015 assert_eq!(external_out.next().await.unwrap(), 1);
2016
2017 external_in.send((1, 2)).await.unwrap();
2018 assert_eq!(external_out.next().await.unwrap(), 1);
2019
2020 external_in.send((2, 2)).await.unwrap();
2021 assert_eq!(external_out.next().await.unwrap(), 2);
2022
2023 external_in.send((1, 1)).await.unwrap();
2024 assert_eq!(external_out.next().await.unwrap(), 2);
2025
2026 external_in.send((3, 1)).await.unwrap();
2027 assert_eq!(external_out.next().await.unwrap(), 3);
2028 }
2029
2030 #[cfg(feature = "deploy")]
2031 #[tokio::test]
2032 async fn into_singleton_bounded_value() {
2033 let mut deployment = Deployment::new();
2034
2035 let mut flow = FlowBuilder::new();
2036 let node = flow.process::<()>();
2037 let external = flow.external::<()>();
2038
2039 let (input_port, input) = node.source_external_bincode(&external);
2040 let out = input
2041 .into_keyed()
2042 .first()
2043 .into_singleton()
2044 .sample_eager(nondet!(/** test */))
2045 .send_bincode_external(&external);
2046
2047 let nodes = flow
2048 .with_process(&node, deployment.Localhost())
2049 .with_external(&external, deployment.Localhost())
2050 .deploy(&mut deployment);
2051
2052 deployment.deploy().await.unwrap();
2053
2054 let mut external_in = nodes.connect(input_port).await;
2055 let mut external_out = nodes.connect(out).await;
2056
2057 deployment.start().await.unwrap();
2058
2059 assert_eq!(
2060 external_out.next().await.unwrap(),
2061 std::collections::HashMap::new()
2062 );
2063
2064 external_in.send((1, 1)).await.unwrap();
2065 assert_eq!(
2066 external_out.next().await.unwrap(),
2067 vec![(1, 1)].into_iter().collect()
2068 );
2069
2070 external_in.send((2, 2)).await.unwrap();
2071 assert_eq!(
2072 external_out.next().await.unwrap(),
2073 vec![(1, 1), (2, 2)].into_iter().collect()
2074 );
2075 }
2076
2077 #[cfg(feature = "deploy")]
2078 #[tokio::test]
2079 async fn into_singleton_unbounded_value() {
2080 let mut deployment = Deployment::new();
2081
2082 let mut flow = FlowBuilder::new();
2083 let node = flow.process::<()>();
2084 let external = flow.external::<()>();
2085
2086 let (input_port, input) = node.source_external_bincode(&external);
2087 let out = input
2088 .into_keyed()
2089 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2090 .into_singleton()
2091 .sample_eager(nondet!(/** test */))
2092 .send_bincode_external(&external);
2093
2094 let nodes = flow
2095 .with_process(&node, deployment.Localhost())
2096 .with_external(&external, deployment.Localhost())
2097 .deploy(&mut deployment);
2098
2099 deployment.deploy().await.unwrap();
2100
2101 let mut external_in = nodes.connect(input_port).await;
2102 let mut external_out = nodes.connect(out).await;
2103
2104 deployment.start().await.unwrap();
2105
2106 assert_eq!(
2107 external_out.next().await.unwrap(),
2108 std::collections::HashMap::new()
2109 );
2110
2111 external_in.send((1, 1)).await.unwrap();
2112 assert_eq!(
2113 external_out.next().await.unwrap(),
2114 vec![(1, 1)].into_iter().collect()
2115 );
2116
2117 external_in.send((1, 2)).await.unwrap();
2118 assert_eq!(
2119 external_out.next().await.unwrap(),
2120 vec![(1, 2)].into_iter().collect()
2121 );
2122
2123 external_in.send((2, 2)).await.unwrap();
2124 assert_eq!(
2125 external_out.next().await.unwrap(),
2126 vec![(1, 2), (2, 1)].into_iter().collect()
2127 );
2128
2129 external_in.send((1, 1)).await.unwrap();
2130 assert_eq!(
2131 external_out.next().await.unwrap(),
2132 vec![(1, 3), (2, 1)].into_iter().collect()
2133 );
2134
2135 external_in.send((3, 1)).await.unwrap();
2136 assert_eq!(
2137 external_out.next().await.unwrap(),
2138 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
2139 );
2140 }
2141
2142 #[cfg(feature = "sim")]
2143 #[test]
2144 fn sim_unbounded_singleton_snapshot() {
2145 let mut flow = FlowBuilder::new();
2146 let node = flow.process::<()>();
2147
2148 let (input_port, input) = node.sim_input();
2149 let output = input
2150 .into_keyed()
2151 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
2152 .snapshot(&node.tick(), nondet!(/** test */))
2153 .entries()
2154 .all_ticks()
2155 .sim_output();
2156
2157 let count = flow.sim().exhaustive(async || {
2158 input_port.send((1, 123));
2159 input_port.send((1, 456));
2160 input_port.send((2, 123));
2161
2162 let all = output.collect_sorted::<Vec<_>>().await;
2163 assert_eq!(all.last().unwrap(), &(2, 1));
2164 });
2165
2166 assert_eq!(count, 8);
2167 }
2168
2169 #[cfg(feature = "deploy")]
2170 #[tokio::test]
2171 async fn join_keyed_stream() {
2172 let mut deployment = Deployment::new();
2173
2174 let mut flow = FlowBuilder::new();
2175 let node = flow.process::<()>();
2176 let external = flow.external::<()>();
2177
2178 let tick = node.tick();
2179 let keyed_data = node
2180 .source_iter(q!(vec![(1, 10), (2, 20)]))
2181 .into_keyed()
2182 .batch(&tick, nondet!(/** test */))
2183 .first();
2184 let requests = node
2185 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
2186 .into_keyed()
2187 .batch(&tick, nondet!(/** test */));
2188
2189 let out = keyed_data
2190 .join_keyed_stream(requests)
2191 .entries()
2192 .all_ticks()
2193 .send_bincode_external(&external);
2194
2195 let nodes = flow
2196 .with_process(&node, deployment.Localhost())
2197 .with_external(&external, deployment.Localhost())
2198 .deploy(&mut deployment);
2199
2200 deployment.deploy().await.unwrap();
2201
2202 let mut external_out = nodes.connect(out).await;
2203
2204 deployment.start().await.unwrap();
2205
2206 let mut results = vec![];
2207 for _ in 0..2 {
2208 results.push(external_out.next().await.unwrap());
2209 }
2210 results.sort();
2211
2212 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
2213 }
2214
2215 #[cfg(feature = "sim")]
2216 #[test]
2217 fn threshold_greater_or_equal_monotonic() {
2218 let mut flow = FlowBuilder::new();
2219 let node = flow.process::<()>();
2220
2221 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2222 let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2223
2224 // Create a monotonically increasing keyed singleton via fold with monotone proof
2225 let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2226 input.into_keyed().fold(
2227 q!(|| 0usize),
2228 q!(
2229 |acc, v| *acc += v,
2230 monotone = crate::properties::manual_proof!(/** += is monotonic */)
2231 ),
2232 );
2233
2234 // BoundedValue keyed singleton of thresholds (from .first() on unbounded stream)
2235 let thresholds = thresh_input.into_keyed().first();
2236
2237 let output = counts
2238 .threshold_greater_or_equal(thresholds)
2239 .entries()
2240 .sim_output();
2241
2242 let count = flow.sim().exhaustive(async || {
2243 // Set thresholds: key 1 needs value >= 5, key 2 needs value >= 10
2244 thresh_port.send((1, 5));
2245 thresh_port.send((2, 10));
2246
2247 // key 1 gets increments: 3 + 3 = 6, which is >= 5 ✓
2248 input_port.send((1, 3));
2249 input_port.send((1, 3));
2250 // key 2 gets increments: 3 + 3 = 6, which is < 10 ✗
2251 input_port.send((2, 3));
2252 input_port.send((2, 3));
2253
2254 let results = output.collect_sorted::<Vec<_>>().await;
2255 assert_eq!(results, vec![(1, 5)]);
2256 });
2257
2258 assert!(count > 0);
2259 }
2260
2261 #[cfg(feature = "sim")]
2262 #[test]
2263 fn threshold_greater_or_equal_uniform() {
2264 let mut flow = FlowBuilder::new();
2265 let node = flow.process::<()>();
2266
2267 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2268
2269 let counts: super::KeyedSingleton<u32, usize, _, super::MonotonicValue> =
2270 input.into_keyed().fold(
2271 q!(|| 0usize),
2272 q!(
2273 |acc, v| *acc += v,
2274 monotone = crate::properties::manual_proof!(/** += is monotonic */)
2275 ),
2276 );
2277
2278 // Uniform threshold: all keys need value >= 5
2279 let threshold = node.singleton(q!(5usize));
2280
2281 let output = counts
2282 .threshold_greater_or_equal_uniform(threshold)
2283 .entries()
2284 .sim_output();
2285
2286 let count = flow.sim().exhaustive(async || {
2287 // key 1: 3 + 3 = 6 >= 5 ✓
2288 input_port.send((1, 3));
2289 input_port.send((1, 3));
2290 // key 2: 2 + 2 = 4 < 5 ✗
2291 input_port.send((2, 2));
2292 input_port.send((2, 2));
2293
2294 let results = output.collect_sorted::<Vec<_>>().await;
2295 assert_eq!(results, vec![(1, 5)]);
2296 });
2297
2298 assert!(count > 0);
2299 }
2300
2301 #[cfg(feature = "sim")]
2302 #[test]
2303 fn threshold_greater_or_equal_bounded_value() {
2304 let mut flow = FlowBuilder::new();
2305 let node = flow.process::<()>();
2306
2307 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2308 let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2309
2310 // BoundedValue keyed singleton (values fixed once per key via .first())
2311 let values = input.into_keyed().first();
2312
2313 // BoundedValue keyed singleton of thresholds
2314 let thresholds = thresh_input.into_keyed().first();
2315
2316 let output = values
2317 .threshold_greater_or_equal(thresholds)
2318 .entries()
2319 .sim_output();
2320
2321 let count = flow.sim().exhaustive(async || {
2322 // Set thresholds: key 1 needs >= 3, key 2 needs >= 10
2323 thresh_port.send((1, 3));
2324 thresh_port.send((2, 10));
2325
2326 // key 1 gets value 5 >= 3 ✓, key 2 gets value 4 < 10 ✗
2327 input_port.send((1, 5));
2328 input_port.send((2, 4));
2329
2330 let results = output.collect_sorted::<Vec<_>>().await;
2331 assert_eq!(results, vec![(1, 3)]);
2332 });
2333
2334 assert!(count > 0);
2335 }
2336
2337 #[cfg(feature = "sim")]
2338 #[test]
2339 fn threshold_greater_or_equal_uniform_bounded_value() {
2340 let mut flow = FlowBuilder::new();
2341 let node = flow.process::<()>();
2342
2343 let (input_port, input) = node.sim_input::<(u32, usize), _, _>();
2344
2345 // BoundedValue keyed singleton (values fixed once per key via .first())
2346 let values = input.into_keyed().first();
2347
2348 // Uniform threshold: all keys need value >= 5
2349 let threshold = node.singleton(q!(5usize));
2350
2351 let output = values
2352 .threshold_greater_or_equal_uniform(threshold)
2353 .entries()
2354 .sim_output();
2355
2356 let count = flow.sim().exhaustive(async || {
2357 // key 1 gets value 7 >= 5 ✓, key 2 gets value 3 < 5 ✗
2358 input_port.send((1, 7));
2359 input_port.send((2, 3));
2360
2361 let results = output.collect_sorted::<Vec<_>>().await;
2362 assert_eq!(results, vec![(1, 5)]);
2363 });
2364
2365 assert!(count > 0);
2366 }
2367
2368 #[cfg(feature = "sim")]
2369 #[test]
2370 fn threshold_greater_or_equal_bounded() {
2371 let mut flow = FlowBuilder::new();
2372 let node = flow.process::<()>();
2373
2374 // Bounded keyed singleton (fully known upfront)
2375 let values = node
2376 .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2377 .into_keyed()
2378 .first();
2379
2380 // BoundedValue thresholds (from async source)
2381 let (thresh_port, thresh_input) = node.sim_input::<(u32, usize), _, _>();
2382 let thresholds = thresh_input.into_keyed().first();
2383
2384 let output = values
2385 .threshold_greater_or_equal(thresholds)
2386 .entries()
2387 .sim_output();
2388
2389 let count = flow.sim().exhaustive(async || {
2390 thresh_port.send((1, 5));
2391 thresh_port.send((2, 10));
2392
2393 // key 1: 6 >= 5 ✓, key 2: 4 < 10 ✗
2394 let results = output.collect_sorted::<Vec<_>>().await;
2395 assert_eq!(results, vec![(1, 5)]);
2396 });
2397
2398 assert!(count > 0);
2399 }
2400
2401 #[cfg(feature = "sim")]
2402 #[test]
2403 fn threshold_greater_or_equal_uniform_bounded() {
2404 let mut flow = FlowBuilder::new();
2405 let node = flow.process::<()>();
2406
2407 let values = node
2408 .source_iter(q!(vec![(1, 6usize), (2, 4usize)]))
2409 .into_keyed()
2410 .first();
2411 let threshold = node.singleton(q!(5usize));
2412
2413 let output = values
2414 .threshold_greater_or_equal_uniform(threshold)
2415 .entries()
2416 .sim_output();
2417
2418 let count = flow.sim().exhaustive(async || {
2419 // key 1: 6 >= 5 ✓, key 2: 4 < 5 ✗
2420 let results = output.collect_sorted::<Vec<_>>().await;
2421 assert_eq!(results, vec![(1, 5)]);
2422 });
2423
2424 assert!(count > 0);
2425 }
2426}