1use core::panic;
2use std::cell::{Cell, RefCell};
3use std::collections::HashMap;
4#[cfg(feature = "build")]
5use std::collections::HashSet;
6use std::fmt::{Debug, Display};
7use std::hash::{Hash, Hasher};
8use std::ops::Deref;
9use std::rc::Rc;
10
11#[cfg(feature = "build")]
12use dfir_lang::graph::FlatGraphBuilder;
13#[cfg(feature = "build")]
14use proc_macro2::Span;
15use proc_macro2::TokenStream;
16use quote::ToTokens;
17#[cfg(feature = "build")]
18use quote::quote;
19#[cfg(feature = "build")]
20use slotmap::{SecondaryMap, SparseSecondaryMap};
21#[cfg(feature = "build")]
22use syn::parse_quote;
23
24#[cfg(feature = "build")]
25use crate::compile::builder::ClockId;
26#[cfg(feature = "build")]
27use crate::compile::builder::StmtId;
28use crate::compile::builder::{CycleId, ExternalPortId};
29#[cfg(feature = "build")]
30use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
31#[cfg(feature = "build")]
32use crate::handoff_ref::handoff_ref_ident;
33use crate::location::dynamic::{ClusterConsistency, LocationId};
34use crate::location::{LocationKey, NetworkHint};
35
36pub mod backtrace;
37use backtrace::Backtrace;
38
39pub struct ClosureExpr {
45 pub(crate) expr: DebugExpr,
46 pub(crate) singleton_refs: Vec<(HydroNode, bool)>,
50}
51
52impl Clone for ClosureExpr {
53 fn clone(&self) -> Self {
54 Self {
55 expr: self.expr.clone(),
56 singleton_refs: self
57 .singleton_refs
58 .iter()
59 .map(|(node, is_mut)| {
60 let HydroNode::Reference {
61 inner,
62 kind,
63 access_counter,
64 metadata,
65 } = node
66 else {
67 panic!("singleton_refs should only contain HydroNode::Reference");
68 };
69 (
70 HydroNode::Reference {
71 inner: SharedNode(Rc::clone(&inner.0)),
72 kind: *kind,
73 access_counter: access_counter.freeze(),
74 metadata: metadata.clone(),
75 },
76 *is_mut,
77 )
78 })
79 .collect(),
80 }
81 }
82}
83
84impl Hash for ClosureExpr {
85 fn hash<H: Hasher>(&self, state: &mut H) {
86 self.expr.hash(state);
87 }
91}
92
93impl serde::Serialize for ClosureExpr {
94 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
95 use serde::ser::SerializeStruct;
96 let mut s = serializer.serialize_struct("ClosureExpr", 2)?;
97 s.serialize_field("expr", &self.expr)?;
98 s.serialize_field(
99 "singleton_refs",
100 &SerializableSingletonRefs(&self.singleton_refs),
101 )?;
102 s.end()
103 }
104}
105
106struct SerializableSingletonRefs<'a>(&'a [(HydroNode, bool)]);
107
108impl serde::Serialize for SerializableSingletonRefs<'_> {
109 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
110 use serde::ser::SerializeSeq;
111 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
112 for (node, is_mut) in self.0.iter() {
113 seq.serialize_element(&(node, is_mut))?;
114 }
115 seq.end()
116 }
117}
118
119impl Debug for ClosureExpr {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 Debug::fmt(&self.expr, f)
122 }
123}
124
125impl Display for ClosureExpr {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 Display::fmt(&self.expr, f)
128 }
129}
130
131impl From<syn::Expr> for ClosureExpr {
132 fn from(expr: syn::Expr) -> Self {
133 Self {
134 expr: DebugExpr(Box::new(expr)),
135 singleton_refs: Vec::new(),
136 }
137 }
138}
139
140impl From<DebugExpr> for ClosureExpr {
141 fn from(expr: DebugExpr) -> Self {
142 Self {
143 expr,
144 singleton_refs: Vec::new(),
145 }
146 }
147}
148
149impl ClosureExpr {
150 pub fn new(expr: DebugExpr, singleton_refs: Vec<(HydroNode, bool)>) -> Self {
151 Self {
152 expr,
153 singleton_refs,
154 }
155 }
156
157 pub fn has_mut_ref(&self) -> bool {
158 self.singleton_refs.iter().any(|(_, is_mut)| *is_mut)
159 }
160
161 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> Self {
162 Self {
163 expr: self.expr.clone(),
164 singleton_refs: self
165 .singleton_refs
166 .iter()
167 .map(|(node, is_mut)| (node.deep_clone(seen_tees), *is_mut))
168 .collect(),
169 }
170 }
171
172 pub fn transform_children(
173 &mut self,
174 transform: &mut impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
175 seen_tees: &mut SeenSharedNodes,
176 ) {
177 for (ref_node, _is_mut) in self.singleton_refs.iter_mut() {
178 transform(ref_node, seen_tees);
179 }
180 }
181
182 #[cfg(feature = "build")]
185 pub fn emit_tokens(&self, ident_stack: &mut Vec<syn::Ident>) -> TokenStream {
186 if self.singleton_refs.is_empty() {
187 self.expr.0.to_token_stream()
188 } else {
189 assert!(
190 ident_stack.len() >= self.singleton_refs.len(),
191 "ident_stack has {} entries but expected at least {} for singleton_refs",
192 ident_stack.len(),
193 self.singleton_refs.len()
194 );
195 let ref_idents = ident_stack.drain(ident_stack.len() - self.singleton_refs.len()..);
196
197 let mut let_bindings = Vec::new();
198 for ((i, (ref_node, is_mut)), ref_ident) in
199 self.singleton_refs.iter().enumerate().zip(ref_idents)
200 {
201 let HydroNode::Reference { access_counter, .. } = ref_node else {
202 panic!("ClosureExpression expected references to `HydroNode::Reference`");
203 };
204 let group = access_counter.frozen_group();
205 let local_ident = handoff_ref_ident(i);
207 let hash = proc_macro2::Punct::new('#', proc_macro2::Spacing::Alone);
208 let group_lit = proc_macro2::Literal::u32_unsuffixed(group);
209 let mut_token = is_mut.then(|| quote!(mut));
210 let binding = quote! {
211 let #local_ident = #hash {#group_lit} #mut_token #ref_ident;
212 };
213 let_bindings.push(binding);
214 }
215
216 let expr = &self.expr.0;
217 quote! {
218 {
219 #( #let_bindings )*
220 #expr
221 }
222 }
223 }
224 }
225}
226
227#[derive(Clone, Hash)]
231pub struct DebugExpr(pub Box<syn::Expr>);
232
233impl serde::Serialize for DebugExpr {
234 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
235 serializer.serialize_str(&self.to_string())
236 }
237}
238
239impl From<syn::Expr> for DebugExpr {
240 fn from(expr: syn::Expr) -> Self {
241 Self(Box::new(expr))
242 }
243}
244
245impl Deref for DebugExpr {
246 type Target = syn::Expr;
247
248 fn deref(&self) -> &Self::Target {
249 &self.0
250 }
251}
252
253impl ToTokens for DebugExpr {
254 fn to_tokens(&self, tokens: &mut TokenStream) {
255 self.0.to_tokens(tokens);
256 }
257}
258
259impl Debug for DebugExpr {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 write!(f, "{}", self.0.to_token_stream())
262 }
263}
264
265impl Display for DebugExpr {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 let original = self.0.as_ref().clone();
268 let simplified = simplify_q_macro(original);
269
270 write!(f, "q!({})", quote::quote!(#simplified))
273 }
274}
275
276fn simplify_q_macro(expr: syn::Expr) -> syn::Expr {
278 if let syn::Expr::Call(ref call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
279 && is_stageleft_runtime_support_call(&path_expr.path)
281 && let syn::Expr::Block(b) = &call.args[0]
282 && b.block.stmts.len() == 3
283 && let Some(syn::Stmt::Expr(e, _)) = b.block.stmts.get(2)
284 {
286 let mut e = e.clone();
287 while let syn::Expr::Block(ref mut block) = e
288 && block.block.stmts.len() == 1
289 && let syn::Stmt::Expr(inner_e, _) = block.block.stmts.remove(0)
290 {
291 e = inner_e;
292 }
293
294 e
295 } else {
296 expr
297 }
298}
299
300fn is_stageleft_runtime_support_call(path: &syn::Path) -> bool {
301 if let Some(last_segment) = path.segments.last() {
303 let fn_name = last_segment.ident.to_string();
304 path.segments.len() > 2
305 && path.segments[0].ident == "stageleft"
306 && path.segments[1].ident == "runtime_support"
307 && fn_name.contains("_type_hint")
308 } else {
309 false
310 }
311}
312
313#[derive(Clone, PartialEq, Eq, Hash)]
317pub struct DebugType(pub Box<syn::Type>);
318
319impl From<syn::Type> for DebugType {
320 fn from(t: syn::Type) -> Self {
321 Self(Box::new(t))
322 }
323}
324
325impl Deref for DebugType {
326 type Target = syn::Type;
327
328 fn deref(&self) -> &Self::Target {
329 &self.0
330 }
331}
332
333impl ToTokens for DebugType {
334 fn to_tokens(&self, tokens: &mut TokenStream) {
335 self.0.to_tokens(tokens);
336 }
337}
338
339impl Debug for DebugType {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 write!(f, "{}", self.0.to_token_stream())
342 }
343}
344
345impl serde::Serialize for DebugType {
346 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
347 serializer.serialize_str(&format!("{}", self.0.to_token_stream()))
348 }
349}
350
351fn serialize_backtrace_as_span<S: serde::Serializer>(
352 backtrace: &Backtrace,
353 serializer: S,
354) -> Result<S::Ok, S::Error> {
355 match backtrace.format_span() {
356 Some(span) => serializer.serialize_some(&span),
357 None => serializer.serialize_none(),
358 }
359}
360
361fn serialize_ident<S: serde::Serializer>(
362 ident: &syn::Ident,
363 serializer: S,
364) -> Result<S::Ok, S::Error> {
365 serializer.serialize_str(&ident.to_string())
366}
367
368pub enum DebugInstantiate {
369 Building,
370 Finalized(Box<DebugInstantiateFinalized>),
371}
372
373impl serde::Serialize for DebugInstantiate {
374 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
375 match self {
376 DebugInstantiate::Building => {
377 serializer.serialize_unit_variant("DebugInstantiate", 0, "Building")
378 }
379 DebugInstantiate::Finalized(_) => {
380 panic!(
381 "cannot serialize DebugInstantiate::Finalized: contains non-serializable runtime state (closures)"
382 )
383 }
384 }
385 }
386}
387
388#[cfg_attr(
389 not(feature = "build"),
390 expect(
391 dead_code,
392 reason = "sink, source unused without `feature = \"build\"`."
393 )
394)]
395pub struct DebugInstantiateFinalized {
396 sink: syn::Expr,
397 source: syn::Expr,
398 connect_fn: Option<Box<dyn FnOnce()>>,
399}
400
401impl From<DebugInstantiateFinalized> for DebugInstantiate {
402 fn from(f: DebugInstantiateFinalized) -> Self {
403 Self::Finalized(Box::new(f))
404 }
405}
406
407impl Debug for DebugInstantiate {
408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409 write!(f, "<network instantiate>")
410 }
411}
412
413impl Hash for DebugInstantiate {
414 fn hash<H: Hasher>(&self, _state: &mut H) {
415 }
417}
418
419impl Clone for DebugInstantiate {
420 fn clone(&self) -> Self {
421 match self {
422 DebugInstantiate::Building => DebugInstantiate::Building,
423 DebugInstantiate::Finalized(_) => {
424 panic!("DebugInstantiate::Finalized should not be cloned")
425 }
426 }
427 }
428}
429
430#[derive(Debug, Hash, Clone, serde::Serialize)]
439pub enum ClusterMembersState {
440 Uninit,
442 Stream(DebugExpr),
445 Tee(LocationId, LocationId),
449}
450
451#[derive(Debug, Hash, Clone, serde::Serialize)]
453pub enum HydroSource {
454 Stream(DebugExpr),
455 ExternalNetwork(),
456 Iter(DebugExpr),
457 Spin(),
458 ClusterMembers(LocationId, ClusterMembersState),
459 Embedded(#[serde(serialize_with = "serialize_ident")] syn::Ident),
460 EmbeddedSingleton(#[serde(serialize_with = "serialize_ident")] syn::Ident),
461}
462
463#[cfg(feature = "build")]
464pub trait DfirBuilder {
470 fn singleton_intermediates(&self) -> bool;
472
473 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
475
476 #[expect(clippy::too_many_arguments, reason = "TODO")]
477 fn batch(
478 &mut self,
479 in_ident: syn::Ident,
480 in_location: &LocationId,
481 in_kind: &CollectionKind,
482 out_ident: &syn::Ident,
483 out_location: &LocationId,
484 op_meta: &HydroIrOpMetadata,
485 fold_hooked_idents: &HashSet<String>,
486 );
487 fn yield_from_tick(
488 &mut self,
489 in_ident: syn::Ident,
490 in_location: &LocationId,
491 in_kind: &CollectionKind,
492 out_ident: &syn::Ident,
493 out_location: &LocationId,
494 );
495
496 fn begin_atomic(
497 &mut self,
498 in_ident: syn::Ident,
499 in_location: &LocationId,
500 in_kind: &CollectionKind,
501 out_ident: &syn::Ident,
502 out_location: &LocationId,
503 op_meta: &HydroIrOpMetadata,
504 );
505 fn end_atomic(
506 &mut self,
507 in_ident: syn::Ident,
508 in_location: &LocationId,
509 in_kind: &CollectionKind,
510 out_ident: &syn::Ident,
511 );
512
513 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
514 fn observe_nondet(
515 &mut self,
516 trusted: bool,
517 location: &LocationId,
518 in_ident: syn::Ident,
519 in_kind: &CollectionKind,
520 out_ident: &syn::Ident,
521 out_kind: &CollectionKind,
522 op_meta: &HydroIrOpMetadata,
523 );
524
525 #[expect(clippy::too_many_arguments, reason = "TODO")]
526 fn merge_ordered(
527 &mut self,
528 location: &LocationId,
529 first_ident: syn::Ident,
530 second_ident: syn::Ident,
531 out_ident: &syn::Ident,
532 in_kind: &CollectionKind,
533 op_meta: &HydroIrOpMetadata,
534 operator_tag: Option<&str>,
535 );
536
537 #[expect(clippy::too_many_arguments, reason = "TODO")]
538 fn create_network(
539 &mut self,
540 from: &LocationId,
541 to: &LocationId,
542 input_ident: syn::Ident,
543 out_ident: &syn::Ident,
544 serialize: Option<&DebugExpr>,
545 sink: syn::Expr,
546 source: syn::Expr,
547 deserialize: Option<&DebugExpr>,
548 tag_id: StmtId,
549 networking_info: &crate::networking::NetworkingInfo,
550 );
551
552 fn create_external_source(
553 &mut self,
554 on: &LocationId,
555 source_expr: syn::Expr,
556 out_ident: &syn::Ident,
557 deserialize: Option<&DebugExpr>,
558 tag_id: StmtId,
559 );
560
561 fn create_external_output(
562 &mut self,
563 on: &LocationId,
564 sink_expr: syn::Expr,
565 input_ident: &syn::Ident,
566 serialize: Option<&DebugExpr>,
567 tag_id: StmtId,
568 );
569
570 fn emit_fold_hook(
573 &mut self,
574 location: &LocationId,
575 in_ident: &syn::Ident,
576 in_kind: &CollectionKind,
577 op_meta: &HydroIrOpMetadata,
578 ) -> Option<syn::Ident>;
579
580 fn assert_is_consistent(
584 &mut self,
585 trusted: bool,
586 location: &LocationId,
587 in_ident: syn::Ident,
588 out_ident: &syn::Ident,
589 );
590
591 fn observe_for_mut(
595 &mut self,
596 location: &LocationId,
597 in_ident: syn::Ident,
598 in_kind: &CollectionKind,
599 out_ident: &syn::Ident,
600 op_meta: &HydroIrOpMetadata,
601 );
602}
603
604#[cfg(feature = "build")]
605impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
606 fn singleton_intermediates(&self) -> bool {
607 false
608 }
609
610 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
611 self.entry(location.root().key())
612 .expect("location was removed")
613 .or_default()
614 }
615
616 fn batch(
617 &mut self,
618 in_ident: syn::Ident,
619 in_location: &LocationId,
620 in_kind: &CollectionKind,
621 out_ident: &syn::Ident,
622 _out_location: &LocationId,
623 _op_meta: &HydroIrOpMetadata,
624 _fold_hooked_idents: &HashSet<String>,
625 ) {
626 let builder = self.get_dfir_mut(in_location.root());
627 if in_kind.is_bounded()
628 && matches!(
629 in_kind,
630 CollectionKind::Singleton { .. }
631 | CollectionKind::Optional { .. }
632 | CollectionKind::KeyedSingleton { .. }
633 )
634 {
635 assert!(in_location.is_top_level());
636 builder.add_dfir(
637 parse_quote! {
638 #out_ident = #in_ident -> persist::<'static>();
639 },
640 None,
641 None,
642 );
643 } else {
644 builder.add_dfir(
645 parse_quote! {
646 #out_ident = #in_ident;
647 },
648 None,
649 None,
650 );
651 }
652 }
653
654 fn yield_from_tick(
655 &mut self,
656 in_ident: syn::Ident,
657 in_location: &LocationId,
658 _in_kind: &CollectionKind,
659 out_ident: &syn::Ident,
660 _out_location: &LocationId,
661 ) {
662 let builder = self.get_dfir_mut(in_location.root());
663 builder.add_dfir(
664 parse_quote! {
665 #out_ident = #in_ident;
666 },
667 None,
668 None,
669 );
670 }
671
672 fn begin_atomic(
673 &mut self,
674 in_ident: syn::Ident,
675 in_location: &LocationId,
676 _in_kind: &CollectionKind,
677 out_ident: &syn::Ident,
678 _out_location: &LocationId,
679 _op_meta: &HydroIrOpMetadata,
680 ) {
681 let builder = self.get_dfir_mut(in_location.root());
682 builder.add_dfir(
683 parse_quote! {
684 #out_ident = #in_ident;
685 },
686 None,
687 None,
688 );
689 }
690
691 fn end_atomic(
692 &mut self,
693 in_ident: syn::Ident,
694 in_location: &LocationId,
695 _in_kind: &CollectionKind,
696 out_ident: &syn::Ident,
697 ) {
698 let builder = self.get_dfir_mut(in_location.root());
699 builder.add_dfir(
700 parse_quote! {
701 #out_ident = #in_ident;
702 },
703 None,
704 None,
705 );
706 }
707
708 fn observe_nondet(
709 &mut self,
710 _trusted: bool,
711 location: &LocationId,
712 in_ident: syn::Ident,
713 _in_kind: &CollectionKind,
714 out_ident: &syn::Ident,
715 _out_kind: &CollectionKind,
716 _op_meta: &HydroIrOpMetadata,
717 ) {
718 let builder = self.get_dfir_mut(location);
719 builder.add_dfir(
720 parse_quote! {
721 #out_ident = #in_ident;
722 },
723 None,
724 None,
725 );
726 }
727
728 fn merge_ordered(
729 &mut self,
730 location: &LocationId,
731 first_ident: syn::Ident,
732 second_ident: syn::Ident,
733 out_ident: &syn::Ident,
734 _in_kind: &CollectionKind,
735 _op_meta: &HydroIrOpMetadata,
736 operator_tag: Option<&str>,
737 ) {
738 let builder = self.get_dfir_mut(location);
739 builder.add_dfir(
740 parse_quote! {
741 #out_ident = union();
742 #first_ident -> [0]#out_ident;
743 #second_ident -> [1]#out_ident;
744 },
745 None,
746 operator_tag,
747 );
748 }
749
750 fn create_network(
751 &mut self,
752 from: &LocationId,
753 to: &LocationId,
754 input_ident: syn::Ident,
755 out_ident: &syn::Ident,
756 serialize: Option<&DebugExpr>,
757 sink: syn::Expr,
758 source: syn::Expr,
759 deserialize: Option<&DebugExpr>,
760 tag_id: StmtId,
761 _networking_info: &crate::networking::NetworkingInfo,
762 ) {
763 let sender_builder = self.get_dfir_mut(from);
764 if let Some(serialize_pipeline) = serialize {
765 sender_builder.add_dfir(
766 parse_quote! {
767 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
768 },
769 None,
770 Some(&format!("send{}", tag_id)),
772 );
773 } else {
774 sender_builder.add_dfir(
775 parse_quote! {
776 #input_ident -> dest_sink(#sink);
777 },
778 None,
779 Some(&format!("send{}", tag_id)),
780 );
781 }
782
783 let receiver_builder = self.get_dfir_mut(to);
784 if let Some(deserialize_pipeline) = deserialize {
785 receiver_builder.add_dfir(
786 parse_quote! {
787 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
788 },
789 None,
790 Some(&format!("recv{}", tag_id)),
791 );
792 } else {
793 receiver_builder.add_dfir(
794 parse_quote! {
795 #out_ident = source_stream(#source);
796 },
797 None,
798 Some(&format!("recv{}", tag_id)),
799 );
800 }
801 }
802
803 fn create_external_source(
804 &mut self,
805 on: &LocationId,
806 source_expr: syn::Expr,
807 out_ident: &syn::Ident,
808 deserialize: Option<&DebugExpr>,
809 tag_id: StmtId,
810 ) {
811 let receiver_builder = self.get_dfir_mut(on);
812 if let Some(deserialize_pipeline) = deserialize {
813 receiver_builder.add_dfir(
814 parse_quote! {
815 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
816 },
817 None,
818 Some(&format!("recv{}", tag_id)),
819 );
820 } else {
821 receiver_builder.add_dfir(
822 parse_quote! {
823 #out_ident = source_stream(#source_expr);
824 },
825 None,
826 Some(&format!("recv{}", tag_id)),
827 );
828 }
829 }
830
831 fn create_external_output(
832 &mut self,
833 on: &LocationId,
834 sink_expr: syn::Expr,
835 input_ident: &syn::Ident,
836 serialize: Option<&DebugExpr>,
837 tag_id: StmtId,
838 ) {
839 let sender_builder = self.get_dfir_mut(on);
840 if let Some(serialize_fn) = serialize {
841 sender_builder.add_dfir(
842 parse_quote! {
843 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
844 },
845 None,
846 Some(&format!("send{}", tag_id)),
848 );
849 } else {
850 sender_builder.add_dfir(
851 parse_quote! {
852 #input_ident -> dest_sink(#sink_expr);
853 },
854 None,
855 Some(&format!("send{}", tag_id)),
856 );
857 }
858 }
859
860 fn emit_fold_hook(
861 &mut self,
862 _location: &LocationId,
863 _in_ident: &syn::Ident,
864 _in_kind: &CollectionKind,
865 _op_meta: &HydroIrOpMetadata,
866 ) -> Option<syn::Ident> {
867 None
868 }
869
870 fn assert_is_consistent(
871 &mut self,
872 _trusted: bool,
873 location: &LocationId,
874 in_ident: syn::Ident,
875 out_ident: &syn::Ident,
876 ) {
877 let builder = self.get_dfir_mut(location);
878 builder.add_dfir(
879 parse_quote! {
880 #out_ident = #in_ident;
881 },
882 None,
883 None,
884 );
885 }
886
887 fn observe_for_mut(
888 &mut self,
889 location: &LocationId,
890 in_ident: syn::Ident,
891 _in_kind: &CollectionKind,
892 out_ident: &syn::Ident,
893 _op_meta: &HydroIrOpMetadata,
894 ) {
895 let builder = self.get_dfir_mut(location);
896 builder.add_dfir(
897 parse_quote! {
898 #out_ident = #in_ident;
899 },
900 None,
901 None,
902 );
903 }
904}
905
906#[cfg(feature = "build")]
907pub enum BuildersOrCallback<'a, L, N>
908where
909 L: FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
910 N: FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
911{
912 Builders(&'a mut dyn DfirBuilder),
913 Callback(L, N),
914}
915
916#[derive(Debug, Hash, serde::Serialize)]
920pub enum HydroRoot {
921 ForEach {
922 f: ClosureExpr,
923 input: Box<HydroNode>,
924 op_metadata: HydroIrOpMetadata,
925 },
926 SendExternal {
927 to_external_key: LocationKey,
928 to_port_id: ExternalPortId,
929 to_many: bool,
930 unpaired: bool,
931 serialize_fn: Option<DebugExpr>,
932 instantiate_fn: DebugInstantiate,
933 input: Box<HydroNode>,
934 op_metadata: HydroIrOpMetadata,
935 },
936 DestSink {
937 sink: DebugExpr,
938 input: Box<HydroNode>,
939 op_metadata: HydroIrOpMetadata,
940 },
941 CycleSink {
942 cycle_id: CycleId,
943 input: Box<HydroNode>,
944 op_metadata: HydroIrOpMetadata,
945 },
946 EmbeddedOutput {
947 #[serde(serialize_with = "serialize_ident")]
948 ident: syn::Ident,
949 input: Box<HydroNode>,
950 op_metadata: HydroIrOpMetadata,
951 },
952 Null {
953 input: Box<HydroNode>,
954 op_metadata: HydroIrOpMetadata,
955 },
956}
957
958impl HydroRoot {
959 #[cfg(feature = "build")]
960 #[expect(clippy::too_many_arguments, reason = "TODO(internal)")]
961 pub fn compile_network<'a, D>(
962 &mut self,
963 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
964 seen_tees: &mut SeenSharedNodes,
965 seen_cluster_members: &mut HashSet<(LocationId, LocationKey)>,
966 processes: &SparseSecondaryMap<LocationKey, D::Process>,
967 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
968 externals: &SparseSecondaryMap<LocationKey, D::External>,
969 env: &mut D::InstantiateEnv,
970 ) where
971 D: Deploy<'a>,
972 {
973 let refcell_extra_stmts = RefCell::new(extra_stmts);
974 let refcell_env = RefCell::new(env);
975 let refcell_seen_cluster_members = RefCell::new(seen_cluster_members);
976 self.transform_bottom_up(
977 &mut |l| {
978 if let HydroRoot::SendExternal {
979 #[cfg(feature = "tokio")]
980 input,
981 #[cfg(feature = "tokio")]
982 to_external_key,
983 #[cfg(feature = "tokio")]
984 to_port_id,
985 #[cfg(feature = "tokio")]
986 to_many,
987 #[cfg(feature = "tokio")]
988 unpaired,
989 #[cfg(feature = "tokio")]
990 instantiate_fn,
991 ..
992 } = l
993 {
994 #[cfg(feature = "tokio")]
995 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
996 DebugInstantiate::Building => {
997 let to_node = externals
998 .get(*to_external_key)
999 .unwrap_or_else(|| {
1000 panic!("A external used in the graph was not instantiated: {}", to_external_key)
1001 })
1002 .clone();
1003
1004 match input.metadata().location_id.root() {
1005 &LocationId::Process(process_key) => {
1006 if *to_many {
1007 (
1008 (
1009 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
1010 parse_quote!(DUMMY),
1011 ),
1012 Box::new(|| {}) as Box<dyn FnOnce()>,
1013 )
1014 } else {
1015 let from_node = processes
1016 .get(process_key)
1017 .unwrap_or_else(|| {
1018 panic!("A process used in the graph was not instantiated: {}", process_key)
1019 })
1020 .clone();
1021
1022 let sink_port = from_node.next_port();
1023 let source_port = to_node.next_port();
1024
1025 if *unpaired {
1026 use stageleft::quote_type;
1027 use tokio_util::codec::LengthDelimitedCodec;
1028
1029 to_node.register(*to_port_id, source_port.clone());
1030
1031 let _ = D::e2o_source(
1032 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1033 &to_node, &source_port,
1034 &from_node, &sink_port,
1035 "e_type::<LengthDelimitedCodec>(),
1036 format!("{}_{}", *to_external_key, *to_port_id)
1037 );
1038 }
1039
1040 (
1041 (
1042 D::o2e_sink(
1043 &from_node,
1044 &sink_port,
1045 &to_node,
1046 &source_port,
1047 format!("{}_{}", *to_external_key, *to_port_id)
1048 ),
1049 parse_quote!(DUMMY),
1050 ),
1051 if *unpaired {
1052 D::e2o_connect(
1053 &to_node,
1054 &source_port,
1055 &from_node,
1056 &sink_port,
1057 *to_many,
1058 NetworkHint::Auto,
1059 )
1060 } else {
1061 Box::new(|| {}) as Box<dyn FnOnce()>
1062 },
1063 )
1064 }
1065 }
1066 LocationId::Cluster(cluster_key) => {
1067 let from_node = clusters
1068 .get(*cluster_key)
1069 .unwrap_or_else(|| {
1070 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1071 })
1072 .clone();
1073
1074 let sink_port = from_node.next_port();
1075 let source_port = to_node.next_port();
1076
1077 if *unpaired {
1078 to_node.register(*to_port_id, source_port.clone());
1079 }
1080
1081 (
1082 (
1083 D::m2e_sink(
1084 &from_node,
1085 &sink_port,
1086 &to_node,
1087 &source_port,
1088 format!("{}_{}", *to_external_key, *to_port_id)
1089 ),
1090 parse_quote!(DUMMY),
1091 ),
1092 Box::new(|| {}) as Box<dyn FnOnce()>,
1093 )
1094 }
1095 _ => panic!()
1096 }
1097 },
1098
1099 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1100 };
1101
1102 #[cfg(not(feature = "tokio"))]
1103 {
1104 panic!("Cannot instantiate external inputs without tokio");
1105 };
1106
1107 #[cfg(feature = "tokio")]
1108 {
1109 *instantiate_fn = DebugInstantiateFinalized {
1110 sink: sink_expr,
1111 source: source_expr,
1112 connect_fn: Some(connect_fn),
1113 }
1114 .into();
1115 };
1116 } else if let HydroRoot::EmbeddedOutput { ident, input, .. } = l {
1117 let element_type = match &input.metadata().collection_kind {
1118 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1119 _ => panic!("Embedded output must have Stream collection kind"),
1120 };
1121 let location_key = match input.metadata().location_id.root() {
1122 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1123 _ => panic!("Embedded output must be on a process or cluster"),
1124 };
1125 D::register_embedded_output(
1126 &mut refcell_env.borrow_mut(),
1127 location_key,
1128 ident,
1129 &element_type,
1130 );
1131 }
1132 },
1133 &mut |n| {
1134 if let HydroNode::Network {
1135 name,
1136 networking_info,
1137 input,
1138 instantiate_fn,
1139 metadata,
1140 ..
1141 } = n
1142 {
1143 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
1144 DebugInstantiate::Building => instantiate_network::<D>(
1145 &mut refcell_env.borrow_mut(),
1146 input.metadata().location_id.root(),
1147 metadata.location_id.root(),
1148 processes,
1149 clusters,
1150 name.as_deref(),
1151 networking_info,
1152 ),
1153
1154 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1155 };
1156
1157 *instantiate_fn = DebugInstantiateFinalized {
1158 sink: sink_expr,
1159 source: source_expr,
1160 connect_fn: Some(connect_fn),
1161 }
1162 .into();
1163 } else if let HydroNode::ExternalInput {
1164 from_external_key,
1165 from_port_id,
1166 from_many,
1167 codec_type,
1168 port_hint,
1169 instantiate_fn,
1170 metadata,
1171 ..
1172 } = n
1173 {
1174 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
1175 DebugInstantiate::Building => {
1176 let from_node = externals
1177 .get(*from_external_key)
1178 .unwrap_or_else(|| {
1179 panic!(
1180 "A external used in the graph was not instantiated: {}",
1181 from_external_key,
1182 )
1183 })
1184 .clone();
1185
1186 match metadata.location_id.root() {
1187 &LocationId::Process(process_key) => {
1188 let to_node = processes
1189 .get(process_key)
1190 .unwrap_or_else(|| {
1191 panic!("A process used in the graph was not instantiated: {}", process_key)
1192 })
1193 .clone();
1194
1195 let sink_port = from_node.next_port();
1196 let source_port = to_node.next_port();
1197
1198 from_node.register(*from_port_id, sink_port.clone());
1199
1200 (
1201 (
1202 parse_quote!(DUMMY),
1203 if *from_many {
1204 D::e2o_many_source(
1205 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1206 &to_node, &source_port,
1207 codec_type.0.as_ref(),
1208 format!("{}_{}", *from_external_key, *from_port_id)
1209 )
1210 } else {
1211 D::e2o_source(
1212 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
1213 &from_node, &sink_port,
1214 &to_node, &source_port,
1215 codec_type.0.as_ref(),
1216 format!("{}_{}", *from_external_key, *from_port_id)
1217 )
1218 },
1219 ),
1220 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
1221 )
1222 }
1223 LocationId::Cluster(cluster_key) => {
1224 let to_node = clusters
1225 .get(*cluster_key)
1226 .unwrap_or_else(|| {
1227 panic!("A cluster used in the graph was not instantiated: {}", cluster_key)
1228 })
1229 .clone();
1230
1231 let sink_port = from_node.next_port();
1232 let source_port = to_node.next_port();
1233
1234 from_node.register(*from_port_id, sink_port.clone());
1235
1236 (
1237 (
1238 parse_quote!(DUMMY),
1239 D::e2m_source(
1240 refcell_extra_stmts.borrow_mut().entry(*cluster_key).expect("location was removed").or_default(),
1241 &from_node, &sink_port,
1242 &to_node, &source_port,
1243 codec_type.0.as_ref(),
1244 format!("{}_{}", *from_external_key, *from_port_id)
1245 ),
1246 ),
1247 D::e2m_connect(&from_node, &sink_port, &to_node, &source_port, *port_hint),
1248 )
1249 }
1250 _ => panic!()
1251 }
1252 },
1253
1254 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
1255 };
1256
1257 *instantiate_fn = DebugInstantiateFinalized {
1258 sink: sink_expr,
1259 source: source_expr,
1260 connect_fn: Some(connect_fn),
1261 }
1262 .into();
1263 } else if let HydroNode::Source { source: HydroSource::Embedded(ident), metadata } = n {
1264 let element_type = match &metadata.collection_kind {
1265 CollectionKind::Stream { element_type, .. } => element_type.0.as_ref().clone(),
1266 _ => panic!("Embedded source must have Stream collection kind"),
1267 };
1268 let location_key = match metadata.location_id.root() {
1269 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1270 _ => panic!("Embedded source must be on a process or cluster"),
1271 };
1272 D::register_embedded_stream_input(
1273 &mut refcell_env.borrow_mut(),
1274 location_key,
1275 ident,
1276 &element_type,
1277 );
1278 } else if let HydroNode::Source { source: HydroSource::EmbeddedSingleton(ident), metadata } = n {
1279 let element_type = match &metadata.collection_kind {
1280 CollectionKind::Singleton { element_type, .. } => element_type.0.as_ref().clone(),
1281 _ => panic!("EmbeddedSingleton source must have Singleton collection kind"),
1282 };
1283 let location_key = match metadata.location_id.root() {
1284 LocationId::Process(key) | LocationId::Cluster(key) => *key,
1285 _ => panic!("EmbeddedSingleton source must be on a process or cluster"),
1286 };
1287 D::register_embedded_singleton_input(
1288 &mut refcell_env.borrow_mut(),
1289 location_key,
1290 ident,
1291 &element_type,
1292 );
1293 } else if let HydroNode::Source { source: HydroSource::ClusterMembers(location_id, state), metadata } = n {
1294 match state {
1295 ClusterMembersState::Uninit => {
1296 let at_location = metadata.location_id.root().clone();
1297 let key = (at_location.clone(), location_id.key());
1298 if refcell_seen_cluster_members.borrow_mut().insert(key) {
1299 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
1301 D::cluster_membership_stream(&mut refcell_env.borrow_mut(), &at_location, location_id),
1302 &(),
1303 );
1304 *state = ClusterMembersState::Stream(expr.into());
1305 } else {
1306 *state = ClusterMembersState::Tee(at_location, location_id.clone());
1308 }
1309 }
1310 ClusterMembersState::Stream(_) | ClusterMembersState::Tee(..) => {
1311 panic!("cluster members already finalized");
1312 }
1313 }
1314 }
1315 },
1316 seen_tees,
1317 false,
1318 );
1319 }
1320
1321 pub fn connect_network(&mut self, seen_tees: &mut SeenSharedNodes) {
1322 self.transform_bottom_up(
1323 &mut |l| {
1324 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
1325 match instantiate_fn {
1326 DebugInstantiate::Building => panic!("network not built"),
1327
1328 DebugInstantiate::Finalized(finalized) => {
1329 (finalized.connect_fn.take().unwrap())();
1330 }
1331 }
1332 }
1333 },
1334 &mut |n| {
1335 if let HydroNode::Network { instantiate_fn, .. }
1336 | HydroNode::ExternalInput { instantiate_fn, .. } = n
1337 {
1338 match instantiate_fn {
1339 DebugInstantiate::Building => panic!("network not built"),
1340
1341 DebugInstantiate::Finalized(finalized) => {
1342 (finalized.connect_fn.take().unwrap())();
1343 }
1344 }
1345 }
1346 },
1347 seen_tees,
1348 false,
1349 );
1350 }
1351
1352 pub fn transform_bottom_up(
1353 &mut self,
1354 transform_root: &mut impl FnMut(&mut HydroRoot),
1355 transform_node: &mut impl FnMut(&mut HydroNode),
1356 seen_tees: &mut SeenSharedNodes,
1357 check_well_formed: bool,
1358 ) {
1359 self.transform_children(
1360 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
1361 seen_tees,
1362 );
1363
1364 transform_root(self);
1365 }
1366
1367 pub fn transform_children(
1368 &mut self,
1369 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
1370 seen_tees: &mut SeenSharedNodes,
1371 ) {
1372 match self {
1373 HydroRoot::ForEach { f, input, .. } => {
1374 f.transform_children(&mut transform, seen_tees);
1375 transform(input, seen_tees);
1376 }
1377 HydroRoot::SendExternal { input, .. }
1378 | HydroRoot::DestSink { input, .. }
1379 | HydroRoot::CycleSink { input, .. }
1380 | HydroRoot::EmbeddedOutput { input, .. }
1381 | HydroRoot::Null { input, .. } => {
1382 transform(input, seen_tees);
1383 }
1384 }
1385 }
1386
1387 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroRoot {
1388 match self {
1389 HydroRoot::ForEach {
1390 f,
1391 input,
1392 op_metadata,
1393 } => HydroRoot::ForEach {
1394 f: f.deep_clone(seen_tees),
1395 input: Box::new(input.deep_clone(seen_tees)),
1396 op_metadata: op_metadata.clone(),
1397 },
1398 HydroRoot::SendExternal {
1399 to_external_key,
1400 to_port_id,
1401 to_many,
1402 unpaired,
1403 serialize_fn,
1404 instantiate_fn,
1405 input,
1406 op_metadata,
1407 } => HydroRoot::SendExternal {
1408 to_external_key: *to_external_key,
1409 to_port_id: *to_port_id,
1410 to_many: *to_many,
1411 unpaired: *unpaired,
1412 serialize_fn: serialize_fn.clone(),
1413 instantiate_fn: instantiate_fn.clone(),
1414 input: Box::new(input.deep_clone(seen_tees)),
1415 op_metadata: op_metadata.clone(),
1416 },
1417 HydroRoot::DestSink {
1418 sink,
1419 input,
1420 op_metadata,
1421 } => HydroRoot::DestSink {
1422 sink: sink.clone(),
1423 input: Box::new(input.deep_clone(seen_tees)),
1424 op_metadata: op_metadata.clone(),
1425 },
1426 HydroRoot::CycleSink {
1427 cycle_id,
1428 input,
1429 op_metadata,
1430 } => HydroRoot::CycleSink {
1431 cycle_id: *cycle_id,
1432 input: Box::new(input.deep_clone(seen_tees)),
1433 op_metadata: op_metadata.clone(),
1434 },
1435 HydroRoot::EmbeddedOutput {
1436 ident,
1437 input,
1438 op_metadata,
1439 } => HydroRoot::EmbeddedOutput {
1440 ident: ident.clone(),
1441 input: Box::new(input.deep_clone(seen_tees)),
1442 op_metadata: op_metadata.clone(),
1443 },
1444 HydroRoot::Null { input, op_metadata } => HydroRoot::Null {
1445 input: Box::new(input.deep_clone(seen_tees)),
1446 op_metadata: op_metadata.clone(),
1447 },
1448 }
1449 }
1450
1451 #[cfg(feature = "build")]
1452 pub fn emit(
1453 &mut self,
1454 graph_builders: &mut dyn DfirBuilder,
1455 seen_tees: &mut SeenSharedNodes,
1456 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1457 next_stmt_id: &mut crate::Counter<StmtId>,
1458 fold_hooked_idents: &mut HashSet<String>,
1459 ) {
1460 self.emit_core(
1461 &mut BuildersOrCallback::<
1462 fn(&mut HydroRoot, &mut crate::Counter<StmtId>),
1463 fn(&mut HydroNode, &mut crate::Counter<StmtId>),
1464 >::Builders(graph_builders),
1465 seen_tees,
1466 built_tees,
1467 next_stmt_id,
1468 fold_hooked_idents,
1469 );
1470 }
1471
1472 #[cfg(feature = "build")]
1473 pub fn emit_core(
1474 &mut self,
1475 builders_or_callback: &mut BuildersOrCallback<
1476 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1477 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1478 >,
1479 seen_tees: &mut SeenSharedNodes,
1480 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
1481 next_stmt_id: &mut crate::Counter<StmtId>,
1482 fold_hooked_idents: &mut HashSet<String>,
1483 ) {
1484 match self {
1485 HydroRoot::ForEach { f, input, .. } => {
1486 let input_ident = input.emit_core(
1487 builders_or_callback,
1488 seen_tees,
1489 built_tees,
1490 next_stmt_id,
1491 fold_hooked_idents,
1492 );
1493
1494 let stmt_id = next_stmt_id.get_and_increment();
1495
1496 match builders_or_callback {
1497 BuildersOrCallback::Builders(graph_builders) => {
1498 let mut ident_stack: Vec<syn::Ident> = Vec::new();
1499
1500 for (ref_node, _is_mut) in f.singleton_refs.iter() {
1502 let HydroNode::Reference { inner, .. } = ref_node else {
1503 panic!("singleton_refs should only contain HydroNode::Reference");
1504 };
1505 let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
1506 let idents = built_tees.get(&ptr).expect(
1507 "ForEach singleton ref not found in built_tees — ref node was not emitted",
1508 );
1509 ident_stack.push(idents[0].clone());
1510 }
1511
1512 let f_tokens = f.emit_tokens(&mut ident_stack);
1513
1514 graph_builders
1515 .get_dfir_mut(&input.metadata().location_id)
1516 .add_dfir(
1517 parse_quote! {
1518 #input_ident -> for_each(#f_tokens);
1519 },
1520 None,
1521 Some(&stmt_id.to_string()),
1522 );
1523 }
1524 BuildersOrCallback::Callback(leaf_callback, _) => {
1525 leaf_callback(self, next_stmt_id);
1526 }
1527 }
1528 }
1529
1530 HydroRoot::SendExternal {
1531 serialize_fn,
1532 instantiate_fn,
1533 input,
1534 ..
1535 } => {
1536 let input_ident = input.emit_core(
1537 builders_or_callback,
1538 seen_tees,
1539 built_tees,
1540 next_stmt_id,
1541 fold_hooked_idents,
1542 );
1543
1544 let stmt_id = next_stmt_id.get_and_increment();
1545
1546 match builders_or_callback {
1547 BuildersOrCallback::Builders(graph_builders) => {
1548 let (sink_expr, _) = match instantiate_fn {
1549 DebugInstantiate::Building => (
1550 syn::parse_quote!(DUMMY_SINK),
1551 syn::parse_quote!(DUMMY_SOURCE),
1552 ),
1553
1554 DebugInstantiate::Finalized(finalized) => {
1555 (finalized.sink.clone(), finalized.source.clone())
1556 }
1557 };
1558
1559 graph_builders.create_external_output(
1560 &input.metadata().location_id,
1561 sink_expr,
1562 &input_ident,
1563 serialize_fn.as_ref(),
1564 stmt_id,
1565 );
1566 }
1567 BuildersOrCallback::Callback(leaf_callback, _) => {
1568 leaf_callback(self, next_stmt_id);
1569 }
1570 }
1571 }
1572
1573 HydroRoot::DestSink { sink, input, .. } => {
1574 let input_ident = input.emit_core(
1575 builders_or_callback,
1576 seen_tees,
1577 built_tees,
1578 next_stmt_id,
1579 fold_hooked_idents,
1580 );
1581
1582 let stmt_id = next_stmt_id.get_and_increment();
1583
1584 match builders_or_callback {
1585 BuildersOrCallback::Builders(graph_builders) => {
1586 graph_builders
1587 .get_dfir_mut(&input.metadata().location_id)
1588 .add_dfir(
1589 parse_quote! {
1590 #input_ident -> dest_sink(#sink);
1591 },
1592 None,
1593 Some(&stmt_id.to_string()),
1594 );
1595 }
1596 BuildersOrCallback::Callback(leaf_callback, _) => {
1597 leaf_callback(self, next_stmt_id);
1598 }
1599 }
1600 }
1601
1602 HydroRoot::CycleSink {
1603 cycle_id, input, ..
1604 } => {
1605 let input_ident = input.emit_core(
1606 builders_or_callback,
1607 seen_tees,
1608 built_tees,
1609 next_stmt_id,
1610 fold_hooked_idents,
1611 );
1612
1613 match builders_or_callback {
1614 BuildersOrCallback::Builders(graph_builders) => {
1615 let elem_type: syn::Type = match &input.metadata().collection_kind {
1616 CollectionKind::KeyedSingleton {
1617 key_type,
1618 value_type,
1619 ..
1620 }
1621 | CollectionKind::KeyedStream {
1622 key_type,
1623 value_type,
1624 ..
1625 } => {
1626 parse_quote!((#key_type, #value_type))
1627 }
1628 CollectionKind::Stream { element_type, .. }
1629 | CollectionKind::Singleton { element_type, .. }
1630 | CollectionKind::Optional { element_type, .. } => {
1631 parse_quote!(#element_type)
1632 }
1633 };
1634
1635 let cycle_id_ident = cycle_id.as_ident();
1636 graph_builders
1637 .get_dfir_mut(&input.metadata().location_id)
1638 .add_dfir(
1639 parse_quote! {
1640 #cycle_id_ident = #input_ident -> identity::<#elem_type>();
1641 },
1642 None,
1643 None,
1644 );
1645 }
1646 BuildersOrCallback::Callback(_, _) => {}
1648 }
1649 }
1650
1651 HydroRoot::EmbeddedOutput { ident, input, .. } => {
1652 let input_ident = input.emit_core(
1653 builders_or_callback,
1654 seen_tees,
1655 built_tees,
1656 next_stmt_id,
1657 fold_hooked_idents,
1658 );
1659
1660 let stmt_id = next_stmt_id.get_and_increment();
1661
1662 match builders_or_callback {
1663 BuildersOrCallback::Builders(graph_builders) => {
1664 graph_builders
1665 .get_dfir_mut(&input.metadata().location_id)
1666 .add_dfir(
1667 parse_quote! {
1668 #input_ident -> for_each(&mut #ident);
1669 },
1670 None,
1671 Some(&stmt_id.to_string()),
1672 );
1673 }
1674 BuildersOrCallback::Callback(leaf_callback, _) => {
1675 leaf_callback(self, next_stmt_id);
1676 }
1677 }
1678 }
1679
1680 HydroRoot::Null { input, .. } => {
1681 let input_ident = input.emit_core(
1682 builders_or_callback,
1683 seen_tees,
1684 built_tees,
1685 next_stmt_id,
1686 fold_hooked_idents,
1687 );
1688
1689 let stmt_id = next_stmt_id.get_and_increment();
1690
1691 match builders_or_callback {
1692 BuildersOrCallback::Builders(graph_builders) => {
1693 graph_builders
1694 .get_dfir_mut(&input.metadata().location_id)
1695 .add_dfir(
1696 parse_quote! {
1697 #input_ident -> for_each(|_| {});
1698 },
1699 None,
1700 Some(&stmt_id.to_string()),
1701 );
1702 }
1703 BuildersOrCallback::Callback(leaf_callback, _) => {
1704 leaf_callback(self, next_stmt_id);
1705 }
1706 }
1707 }
1708 }
1709 }
1710
1711 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1712 match self {
1713 HydroRoot::ForEach { op_metadata, .. }
1714 | HydroRoot::SendExternal { op_metadata, .. }
1715 | HydroRoot::DestSink { op_metadata, .. }
1716 | HydroRoot::CycleSink { op_metadata, .. }
1717 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1718 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1719 }
1720 }
1721
1722 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1723 match self {
1724 HydroRoot::ForEach { op_metadata, .. }
1725 | HydroRoot::SendExternal { op_metadata, .. }
1726 | HydroRoot::DestSink { op_metadata, .. }
1727 | HydroRoot::CycleSink { op_metadata, .. }
1728 | HydroRoot::EmbeddedOutput { op_metadata, .. }
1729 | HydroRoot::Null { op_metadata, .. } => op_metadata,
1730 }
1731 }
1732
1733 pub fn input(&self) -> &HydroNode {
1734 match self {
1735 HydroRoot::ForEach { input, .. }
1736 | HydroRoot::SendExternal { input, .. }
1737 | HydroRoot::DestSink { input, .. }
1738 | HydroRoot::CycleSink { input, .. }
1739 | HydroRoot::EmbeddedOutput { input, .. }
1740 | HydroRoot::Null { input, .. } => input,
1741 }
1742 }
1743
1744 pub fn input_metadata(&self) -> &HydroIrMetadata {
1745 self.input().metadata()
1746 }
1747
1748 pub fn print_root(&self) -> String {
1749 match self {
1750 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1751 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1752 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1753 HydroRoot::CycleSink { cycle_id, .. } => format!("CycleSink({})", cycle_id),
1754 HydroRoot::EmbeddedOutput { ident, .. } => {
1755 format!("EmbeddedOutput({})", ident)
1756 }
1757 HydroRoot::Null { .. } => "Null".to_owned(),
1758 }
1759 }
1760
1761 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1762 match self {
1763 HydroRoot::ForEach { f, .. } => {
1764 transform(&mut f.expr);
1765 }
1766 HydroRoot::DestSink { sink, .. } => {
1767 transform(sink);
1768 }
1769 HydroRoot::SendExternal { .. }
1770 | HydroRoot::CycleSink { .. }
1771 | HydroRoot::EmbeddedOutput { .. }
1772 | HydroRoot::Null { .. } => {}
1773 }
1774 }
1775}
1776
1777#[cfg(feature = "build")]
1778fn tick_of(loc: &LocationId) -> Option<ClockId> {
1779 match loc {
1780 LocationId::Tick(id, _) => Some(*id),
1781 LocationId::Atomic(inner) => tick_of(inner),
1782 _ => None,
1783 }
1784}
1785
1786#[cfg(feature = "build")]
1787fn remap_location(loc: &mut LocationId, uf: &mut HashMap<ClockId, ClockId>) {
1788 match loc {
1789 LocationId::Tick(id, inner) => {
1790 *id = uf_find(uf, *id);
1791 remap_location(inner, uf);
1792 }
1793 LocationId::Atomic(inner) => {
1794 remap_location(inner, uf);
1795 }
1796 LocationId::Process(_) | LocationId::Cluster(_) => {}
1797 }
1798}
1799
1800#[cfg(feature = "build")]
1801fn uf_find(parent: &mut HashMap<ClockId, ClockId>, x: ClockId) -> ClockId {
1802 let p = *parent.get(&x).unwrap_or(&x);
1803 if p == x {
1804 return x;
1805 }
1806 let root = uf_find(parent, p);
1807 parent.insert(x, root);
1808 root
1809}
1810
1811#[cfg(feature = "build")]
1812fn uf_union(parent: &mut HashMap<ClockId, ClockId>, a: ClockId, b: ClockId) {
1813 let ra = uf_find(parent, a);
1814 let rb = uf_find(parent, b);
1815 if ra != rb {
1816 parent.insert(ra, rb);
1817 }
1818}
1819
1820#[cfg(feature = "build")]
1824pub fn unify_atomic_ticks(ir: &mut [HydroRoot]) {
1825 let mut uf: HashMap<ClockId, ClockId> = HashMap::new();
1826
1827 transform_bottom_up(
1829 ir,
1830 &mut |_| {},
1831 &mut |node: &mut HydroNode| match node {
1832 HydroNode::Batch { inner, metadata } | HydroNode::YieldConcat { inner, metadata } => {
1833 if let (Some(a), Some(b)) = (
1834 tick_of(&inner.metadata().location_id),
1835 tick_of(&metadata.location_id),
1836 ) {
1837 uf_union(&mut uf, a, b);
1838 }
1839 }
1840 HydroNode::Chain {
1841 first,
1842 second,
1843 metadata,
1844 }
1845 | HydroNode::ChainFirst {
1846 first,
1847 second,
1848 metadata,
1849 }
1850 | HydroNode::MergeOrdered {
1851 first,
1852 second,
1853 metadata,
1854 } => {
1855 if let (Some(a), Some(b)) = (
1856 tick_of(&first.metadata().location_id),
1857 tick_of(&metadata.location_id),
1858 ) {
1859 uf_union(&mut uf, a, b);
1860 }
1861 if let (Some(a), Some(b)) = (
1862 tick_of(&second.metadata().location_id),
1863 tick_of(&metadata.location_id),
1864 ) {
1865 uf_union(&mut uf, a, b);
1866 }
1867 }
1868 _ => {}
1869 },
1870 false,
1871 );
1872
1873 transform_bottom_up(
1875 ir,
1876 &mut |_| {},
1877 &mut |node: &mut HydroNode| {
1878 remap_location(&mut node.metadata_mut().location_id, &mut uf);
1879 },
1880 false,
1881 );
1882}
1883
1884#[cfg(feature = "build")]
1885pub fn emit(ir: &mut Vec<HydroRoot>) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1886 let mut builders = SecondaryMap::new();
1887 let mut seen_tees = HashMap::new();
1888 let mut built_tees = HashMap::new();
1889 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1890 let mut fold_hooked_idents = HashSet::new();
1891 for leaf in ir {
1892 leaf.emit(
1893 &mut builders,
1894 &mut seen_tees,
1895 &mut built_tees,
1896 &mut next_stmt_id,
1897 &mut fold_hooked_idents,
1898 );
1899 }
1900 builders
1901}
1902
1903#[cfg(feature = "build")]
1904pub fn traverse_dfir(
1905 ir: &mut [HydroRoot],
1906 transform_root: impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
1907 transform_node: impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
1908) {
1909 let mut seen_tees = HashMap::new();
1910 let mut built_tees = HashMap::new();
1911 let mut next_stmt_id = crate::Counter::<StmtId>::default();
1912 let mut fold_hooked_idents = HashSet::new();
1913 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1914 ir.iter_mut().for_each(|leaf| {
1915 leaf.emit_core(
1916 &mut callback,
1917 &mut seen_tees,
1918 &mut built_tees,
1919 &mut next_stmt_id,
1920 &mut fold_hooked_idents,
1921 );
1922 });
1923}
1924
1925pub fn transform_bottom_up(
1926 ir: &mut [HydroRoot],
1927 transform_root: &mut impl FnMut(&mut HydroRoot),
1928 transform_node: &mut impl FnMut(&mut HydroNode),
1929 check_well_formed: bool,
1930) {
1931 let mut seen_tees = HashMap::new();
1932 ir.iter_mut().for_each(|leaf| {
1933 leaf.transform_bottom_up(
1934 transform_root,
1935 transform_node,
1936 &mut seen_tees,
1937 check_well_formed,
1938 );
1939 });
1940}
1941
1942pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1943 let mut seen_tees = HashMap::new();
1944 ir.iter()
1945 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1946 .collect()
1947}
1948
1949type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1950thread_local! {
1951 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1952 static SERIALIZED_SHARED: PrintedTees
1956 = const { RefCell::new(None) };
1957}
1958
1959pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1960 PRINTED_TEES.with(|printed_tees| {
1961 let mut printed_tees_mut = printed_tees.borrow_mut();
1962 *printed_tees_mut = Some((0, HashMap::new()));
1963 drop(printed_tees_mut);
1964
1965 let ret = f();
1966
1967 let mut printed_tees_mut = printed_tees.borrow_mut();
1968 *printed_tees_mut = None;
1969
1970 ret
1971 })
1972}
1973
1974pub fn serialize_dedup_shared<T>(f: impl FnOnce() -> T) -> T {
1979 let _guard = SerializedSharedGuard::enter();
1980 f()
1981}
1982
1983struct SerializedSharedGuard {
1986 previous: Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>,
1987}
1988
1989impl SerializedSharedGuard {
1990 fn enter() -> Self {
1991 let previous = SERIALIZED_SHARED.with(|cell| {
1992 let mut guard = cell.borrow_mut();
1993 guard.replace((0, HashMap::new()))
1994 });
1995 Self { previous }
1996 }
1997}
1998
1999impl Drop for SerializedSharedGuard {
2000 fn drop(&mut self) {
2001 SERIALIZED_SHARED.with(|cell| {
2002 *cell.borrow_mut() = self.previous.take();
2003 });
2004 }
2005}
2006
2007pub struct SharedNode(pub Rc<RefCell<HydroNode>>);
2008
2009impl serde::Serialize for SharedNode {
2010 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2021 SERIALIZED_SHARED.with(|cell| {
2022 let mut guard = cell.borrow_mut();
2023 let state = guard.as_mut().ok_or_else(|| {
2025 serde::ser::Error::custom(
2026 "SharedNode serialization requires an active serialize_dedup_shared scope",
2027 )
2028 })?;
2029 let ptr = self.0.as_ptr() as *const RefCell<HydroNode>;
2030
2031 if let Some(&id) = state.1.get(&ptr) {
2032 drop(guard);
2033 use serde::ser::SerializeMap;
2034 let mut map = serializer.serialize_map(Some(1))?;
2035 map.serialize_entry("$shared_ref", &id)?;
2036 map.end()
2037 } else {
2038 let id = state.0;
2039 state.0 += 1;
2040 state.1.insert(ptr, id);
2041 drop(guard);
2042
2043 use serde::ser::SerializeMap;
2044 let mut map = serializer.serialize_map(Some(2))?;
2045 map.serialize_entry("$shared", &id)?;
2046 map.serialize_entry("node", &*self.0.borrow())?;
2047 map.end()
2048 }
2049 })
2050 }
2051}
2052
2053impl SharedNode {
2054 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
2055 Rc::as_ptr(&self.0)
2056 }
2057}
2058
2059impl Debug for SharedNode {
2060 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2061 PRINTED_TEES.with(|printed_tees| {
2062 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
2063 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
2064
2065 if let Some(printed_tees_mut) = printed_tees_mut {
2066 if let Some(existing) = printed_tees_mut
2067 .1
2068 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
2069 {
2070 write!(f, "<shared {}>", existing)
2071 } else {
2072 let next_id = printed_tees_mut.0;
2073 printed_tees_mut.0 += 1;
2074 printed_tees_mut
2075 .1
2076 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
2077 drop(printed_tees_mut_borrow);
2078 write!(f, "<shared {}>: ", next_id)?;
2079 Debug::fmt(&self.0.borrow(), f)
2080 }
2081 } else {
2082 drop(printed_tees_mut_borrow);
2083 write!(f, "<shared>: ")?;
2084 Debug::fmt(&self.0.borrow(), f)
2085 }
2086 })
2087 }
2088}
2089
2090impl Hash for SharedNode {
2091 fn hash<H: Hasher>(&self, state: &mut H) {
2092 self.0.borrow_mut().hash(state);
2093 }
2094}
2095
2096#[derive(Debug)]
2101pub enum AccessCounter {
2102 Counting(Cell<u32>),
2103 Frozen(u32),
2104}
2105
2106impl AccessCounter {
2107 pub fn new() -> Self {
2108 Self::Counting(Cell::new(0))
2109 }
2110
2111 pub fn next_group(&self, is_mut: bool) -> Self {
2115 let AccessCounter::Counting(count) = self else {
2116 panic!("Cannot count on `AccessCounter::Frozen`");
2117 };
2118 let c = if is_mut {
2119 let c = count.get() + 1;
2120 count.set(c + 1);
2121 c
2122 } else {
2123 count.get()
2124 };
2125 Self::Frozen(c)
2126 }
2127
2128 pub fn freeze(&self) -> Self {
2130 Self::Frozen(match self {
2131 Self::Counting(count) => count.get(),
2132 Self::Frozen(count) => *count,
2133 })
2134 }
2135
2136 pub fn frozen_group(&self) -> u32 {
2137 let Self::Frozen(count) = self else {
2138 panic!("`AccessCounter` not frozen");
2139 };
2140 *count
2141 }
2142}
2143
2144impl Default for AccessCounter {
2145 fn default() -> Self {
2146 Self::new()
2147 }
2148}
2149
2150impl Hash for AccessCounter {
2151 fn hash<H: Hasher>(&self, _state: &mut H) {
2152 }
2154}
2155
2156impl serde::Serialize for AccessCounter {
2157 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
2158 let count = match self {
2159 AccessCounter::Counting(count) => count.get(),
2160 AccessCounter::Frozen(count) => *count,
2161 };
2162 count.serialize(serializer)
2163 }
2164}
2165
2166#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2167pub enum BoundKind {
2168 Unbounded,
2169 Bounded,
2170}
2171
2172#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2173pub enum StreamOrder {
2174 NoOrder,
2175 TotalOrder,
2176}
2177
2178#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2179pub enum StreamRetry {
2180 AtLeastOnce,
2181 ExactlyOnce,
2182}
2183
2184#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2185pub enum KeyedSingletonBoundKind {
2186 Unbounded,
2187 MonotonicKeys,
2188 MonotonicValue,
2189 BoundedValue,
2190 Bounded,
2191}
2192
2193#[derive(serde::Serialize, Clone, PartialEq, Eq, Debug)]
2194pub enum SingletonBoundKind {
2195 Unbounded,
2196 Monotonic,
2197 Bounded,
2198}
2199
2200#[derive(Clone, PartialEq, Eq, Debug, serde::Serialize)]
2201pub enum CollectionKind {
2202 Stream {
2203 bound: BoundKind,
2204 order: StreamOrder,
2205 retry: StreamRetry,
2206 element_type: DebugType,
2207 },
2208 Singleton {
2209 bound: SingletonBoundKind,
2210 element_type: DebugType,
2211 },
2212 Optional {
2213 bound: BoundKind,
2214 element_type: DebugType,
2215 },
2216 KeyedStream {
2217 bound: BoundKind,
2218 value_order: StreamOrder,
2219 value_retry: StreamRetry,
2220 key_type: DebugType,
2221 value_type: DebugType,
2222 },
2223 KeyedSingleton {
2224 bound: KeyedSingletonBoundKind,
2225 key_type: DebugType,
2226 value_type: DebugType,
2227 },
2228}
2229
2230impl CollectionKind {
2231 pub fn is_bounded(&self) -> bool {
2232 matches!(
2233 self,
2234 CollectionKind::Stream {
2235 bound: BoundKind::Bounded,
2236 ..
2237 } | CollectionKind::Singleton {
2238 bound: SingletonBoundKind::Bounded,
2239 ..
2240 } | CollectionKind::Optional {
2241 bound: BoundKind::Bounded,
2242 ..
2243 } | CollectionKind::KeyedStream {
2244 bound: BoundKind::Bounded,
2245 ..
2246 } | CollectionKind::KeyedSingleton {
2247 bound: KeyedSingletonBoundKind::Bounded,
2248 ..
2249 }
2250 )
2251 }
2252
2253 pub fn is_strict(&self) -> bool {
2256 match self {
2257 CollectionKind::Stream { order, retry, .. } => {
2258 *order == StreamOrder::TotalOrder && *retry == StreamRetry::ExactlyOnce
2259 }
2260 CollectionKind::KeyedStream {
2261 value_order,
2262 value_retry,
2263 ..
2264 } => {
2265 *value_order == StreamOrder::TotalOrder && *value_retry == StreamRetry::ExactlyOnce
2266 }
2267 CollectionKind::Singleton { .. }
2270 | CollectionKind::Optional { .. }
2271 | CollectionKind::KeyedSingleton { .. } => true,
2272 }
2273 }
2274
2275 pub fn strict_kind(&self) -> CollectionKind {
2277 match self {
2278 CollectionKind::Stream {
2279 bound,
2280 element_type,
2281 ..
2282 } => CollectionKind::Stream {
2283 bound: bound.clone(),
2284 order: StreamOrder::TotalOrder,
2285 retry: StreamRetry::ExactlyOnce,
2286 element_type: element_type.clone(),
2287 },
2288 CollectionKind::KeyedStream {
2289 bound,
2290 key_type,
2291 value_type,
2292 ..
2293 } => CollectionKind::KeyedStream {
2294 bound: bound.clone(),
2295 value_order: StreamOrder::TotalOrder,
2296 value_retry: StreamRetry::ExactlyOnce,
2297 key_type: key_type.clone(),
2298 value_type: value_type.clone(),
2299 },
2300 other => other.clone(),
2301 }
2302 }
2303}
2304
2305#[derive(Clone, serde::Serialize)]
2306pub struct HydroIrMetadata {
2307 pub location_id: LocationId,
2308 pub collection_kind: CollectionKind,
2309 pub consistency: Option<ClusterConsistency>,
2310 pub cardinality: Option<usize>,
2311 pub tag: Option<String>,
2312 pub op: HydroIrOpMetadata,
2313}
2314
2315impl Hash for HydroIrMetadata {
2317 fn hash<H: Hasher>(&self, _: &mut H) {}
2318}
2319
2320impl PartialEq for HydroIrMetadata {
2321 fn eq(&self, _: &Self) -> bool {
2322 true
2323 }
2324}
2325
2326impl Eq for HydroIrMetadata {}
2327
2328impl Debug for HydroIrMetadata {
2329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2330 f.debug_struct("HydroIrMetadata")
2331 .field("location_id", &self.location_id)
2332 .field("collection_kind", &self.collection_kind)
2333 .finish()
2334 }
2335}
2336
2337#[derive(Clone, serde::Serialize)]
2340pub struct HydroIrOpMetadata {
2341 #[serde(rename = "span", serialize_with = "serialize_backtrace_as_span")]
2342 pub backtrace: Backtrace,
2343 pub cpu_usage: Option<f64>,
2344 pub network_recv_cpu_usage: Option<f64>,
2345 pub id: Option<usize>,
2346}
2347
2348impl HydroIrOpMetadata {
2349 #[expect(
2350 clippy::new_without_default,
2351 reason = "explicit calls to new ensure correct backtrace bounds"
2352 )]
2353 pub fn new() -> HydroIrOpMetadata {
2354 Self::new_with_skip(1)
2355 }
2356
2357 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
2358 HydroIrOpMetadata {
2359 backtrace: Backtrace::get_backtrace(2 + skip_count),
2360 cpu_usage: None,
2361 network_recv_cpu_usage: None,
2362 id: None,
2363 }
2364 }
2365}
2366
2367impl Debug for HydroIrOpMetadata {
2368 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2369 f.debug_struct("HydroIrOpMetadata").finish()
2370 }
2371}
2372
2373impl Hash for HydroIrOpMetadata {
2374 fn hash<H: Hasher>(&self, _: &mut H) {}
2375}
2376
2377#[derive(Debug, Hash, serde::Serialize)]
2380pub enum HydroNode {
2381 Placeholder,
2382
2383 Cast {
2391 inner: Box<HydroNode>,
2392 metadata: HydroIrMetadata,
2393 },
2394
2395 ObserveNonDet {
2401 inner: Box<HydroNode>,
2402 trusted: bool, metadata: HydroIrMetadata,
2404 },
2405
2406 Source {
2407 source: HydroSource,
2408 metadata: HydroIrMetadata,
2409 },
2410
2411 SingletonSource {
2412 value: DebugExpr,
2413 first_tick_only: bool,
2414 metadata: HydroIrMetadata,
2415 },
2416
2417 CycleSource {
2418 cycle_id: CycleId,
2419 metadata: HydroIrMetadata,
2420 },
2421
2422 Tee {
2423 inner: SharedNode,
2424 metadata: HydroIrMetadata,
2425 },
2426
2427 Reference {
2436 inner: SharedNode,
2437 kind: crate::handoff_ref::HandoffRefKind,
2438 access_counter: AccessCounter,
2439 metadata: HydroIrMetadata,
2440 },
2441
2442 Partition {
2443 inner: SharedNode,
2444 f: ClosureExpr,
2445 is_true: bool,
2446 metadata: HydroIrMetadata,
2447 },
2448
2449 BeginAtomic {
2450 inner: Box<HydroNode>,
2451 metadata: HydroIrMetadata,
2452 },
2453
2454 EndAtomic {
2455 inner: Box<HydroNode>,
2456 metadata: HydroIrMetadata,
2457 },
2458
2459 Batch {
2460 inner: Box<HydroNode>,
2461 metadata: HydroIrMetadata,
2462 },
2463
2464 YieldConcat {
2465 inner: Box<HydroNode>,
2466 metadata: HydroIrMetadata,
2467 },
2468
2469 Chain {
2470 first: Box<HydroNode>,
2471 second: Box<HydroNode>,
2472 metadata: HydroIrMetadata,
2473 },
2474
2475 MergeOrdered {
2476 first: Box<HydroNode>,
2477 second: Box<HydroNode>,
2478 metadata: HydroIrMetadata,
2479 },
2480
2481 ChainFirst {
2482 first: Box<HydroNode>,
2483 second: Box<HydroNode>,
2484 metadata: HydroIrMetadata,
2485 },
2486
2487 CrossProduct {
2488 left: Box<HydroNode>,
2489 right: Box<HydroNode>,
2490 metadata: HydroIrMetadata,
2491 },
2492
2493 CrossSingleton {
2494 left: Box<HydroNode>,
2495 right: Box<HydroNode>,
2496 metadata: HydroIrMetadata,
2497 },
2498
2499 Join {
2500 left: Box<HydroNode>,
2501 right: Box<HydroNode>,
2502 metadata: HydroIrMetadata,
2503 },
2504
2505 JoinHalf {
2509 left: Box<HydroNode>,
2510 right: Box<HydroNode>,
2511 metadata: HydroIrMetadata,
2512 },
2513
2514 Difference {
2515 pos: Box<HydroNode>,
2516 neg: Box<HydroNode>,
2517 metadata: HydroIrMetadata,
2518 },
2519
2520 AntiJoin {
2521 pos: Box<HydroNode>,
2522 neg: Box<HydroNode>,
2523 metadata: HydroIrMetadata,
2524 },
2525
2526 ResolveFutures {
2527 input: Box<HydroNode>,
2528 metadata: HydroIrMetadata,
2529 },
2530 ResolveFuturesBlocking {
2531 input: Box<HydroNode>,
2532 metadata: HydroIrMetadata,
2533 },
2534 ResolveFuturesOrdered {
2535 input: Box<HydroNode>,
2536 metadata: HydroIrMetadata,
2537 },
2538
2539 Map {
2540 f: ClosureExpr,
2541 input: Box<HydroNode>,
2542 metadata: HydroIrMetadata,
2543 },
2544 FlatMap {
2545 f: ClosureExpr,
2546 input: Box<HydroNode>,
2547 metadata: HydroIrMetadata,
2548 },
2549 FlatMapStreamBlocking {
2550 f: ClosureExpr,
2551 input: Box<HydroNode>,
2552 metadata: HydroIrMetadata,
2553 },
2554 Filter {
2555 f: ClosureExpr,
2556 input: Box<HydroNode>,
2557 metadata: HydroIrMetadata,
2558 },
2559 FilterMap {
2560 f: ClosureExpr,
2561 input: Box<HydroNode>,
2562 metadata: HydroIrMetadata,
2563 },
2564
2565 DeferTick {
2566 input: Box<HydroNode>,
2567 metadata: HydroIrMetadata,
2568 },
2569 Enumerate {
2570 input: Box<HydroNode>,
2571 metadata: HydroIrMetadata,
2572 },
2573 Inspect {
2574 f: ClosureExpr,
2575 input: Box<HydroNode>,
2576 metadata: HydroIrMetadata,
2577 },
2578
2579 Unique {
2580 input: Box<HydroNode>,
2581 metadata: HydroIrMetadata,
2582 },
2583
2584 Sort {
2585 input: Box<HydroNode>,
2586 metadata: HydroIrMetadata,
2587 },
2588 Fold {
2589 init: ClosureExpr,
2590 acc: ClosureExpr,
2591 input: Box<HydroNode>,
2592 metadata: HydroIrMetadata,
2593 },
2594
2595 Scan {
2596 init: ClosureExpr,
2597 acc: ClosureExpr,
2598 input: Box<HydroNode>,
2599 metadata: HydroIrMetadata,
2600 },
2601 ScanAsyncBlocking {
2602 init: ClosureExpr,
2603 acc: ClosureExpr,
2604 input: Box<HydroNode>,
2605 metadata: HydroIrMetadata,
2606 },
2607 FoldKeyed {
2608 init: ClosureExpr,
2609 acc: ClosureExpr,
2610 input: Box<HydroNode>,
2611 metadata: HydroIrMetadata,
2612 },
2613
2614 Reduce {
2615 f: ClosureExpr,
2616 input: Box<HydroNode>,
2617 metadata: HydroIrMetadata,
2618 },
2619 ReduceKeyed {
2620 f: ClosureExpr,
2621 input: Box<HydroNode>,
2622 metadata: HydroIrMetadata,
2623 },
2624 ReduceKeyedWatermark {
2625 f: ClosureExpr,
2626 input: Box<HydroNode>,
2627 watermark: Box<HydroNode>,
2628 metadata: HydroIrMetadata,
2629 },
2630
2631 Network {
2632 name: Option<String>,
2633 networking_info: crate::networking::NetworkingInfo,
2634 serialize_fn: Option<DebugExpr>,
2635 instantiate_fn: DebugInstantiate,
2636 deserialize_fn: Option<DebugExpr>,
2637 input: Box<HydroNode>,
2638 metadata: HydroIrMetadata,
2639 },
2640
2641 ExternalInput {
2642 from_external_key: LocationKey,
2643 from_port_id: ExternalPortId,
2644 from_many: bool,
2645 codec_type: DebugType,
2646 #[serde(skip)]
2647 port_hint: NetworkHint,
2648 instantiate_fn: DebugInstantiate,
2649 deserialize_fn: Option<DebugExpr>,
2650 metadata: HydroIrMetadata,
2651 },
2652
2653 Counter {
2654 tag: String,
2655 duration: DebugExpr,
2656 prefix: String,
2657 input: Box<HydroNode>,
2658 metadata: HydroIrMetadata,
2659 },
2660
2661 AssertIsConsistent {
2662 inner: Box<HydroNode>,
2663 trusted: bool,
2664 metadata: HydroIrMetadata,
2665 },
2666
2667 UnboundSingleton {
2668 inner: Box<HydroNode>,
2669 metadata: HydroIrMetadata,
2670 },
2671}
2672
2673pub type SeenSharedNodes = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
2674pub type SeenSharedNodeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
2675
2676#[cfg(feature = "build")]
2680fn maybe_observe_for_mut(
2681 f: &ClosureExpr,
2682 in_ident: syn::Ident,
2683 in_location: &LocationId,
2684 in_kind: &CollectionKind,
2685 op_meta: &HydroIrOpMetadata,
2686 builders_or_callback: &mut BuildersOrCallback<
2687 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
2688 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
2689 >,
2690 next_stmt_id: &mut crate::Counter<StmtId>,
2691) -> syn::Ident {
2692 if f.has_mut_ref() && !in_kind.is_strict() {
2693 let observe_stmt_id = next_stmt_id.get_and_increment();
2694 let observe_ident =
2695 syn::Ident::new(&format!("stream_{}", observe_stmt_id), Span::call_site());
2696 if let BuildersOrCallback::Builders(graph_builders) = builders_or_callback {
2697 graph_builders.observe_for_mut(in_location, in_ident, in_kind, &observe_ident, op_meta);
2698 }
2699 observe_ident
2700 } else {
2701 in_ident
2702 }
2703}
2704
2705impl HydroNode {
2706 pub fn transform_bottom_up(
2707 &mut self,
2708 transform: &mut impl FnMut(&mut HydroNode),
2709 seen_tees: &mut SeenSharedNodes,
2710 check_well_formed: bool,
2711 ) {
2712 self.transform_children(
2713 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
2714 seen_tees,
2715 );
2716
2717 transform(self);
2718
2719 let self_location = self.metadata().location_id.root();
2720
2721 if check_well_formed {
2722 match &*self {
2723 HydroNode::Network { .. } => {}
2724 _ => {
2725 self.input_metadata().iter().for_each(|i| {
2726 if i.location_id.root() != self_location {
2727 panic!(
2728 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
2729 i,
2730 i.location_id.root(),
2731 self,
2732 self_location
2733 )
2734 }
2735 });
2736 }
2737 }
2738 }
2739 }
2740
2741 #[inline(always)]
2742 pub fn transform_children(
2743 &mut self,
2744 mut transform: impl FnMut(&mut HydroNode, &mut SeenSharedNodes),
2745 seen_tees: &mut SeenSharedNodes,
2746 ) {
2747 match self {
2748 HydroNode::Placeholder => {
2749 panic!();
2750 }
2751
2752 HydroNode::Source { .. }
2753 | HydroNode::SingletonSource { .. }
2754 | HydroNode::CycleSource { .. }
2755 | HydroNode::ExternalInput { .. } => {}
2756
2757 HydroNode::Tee { inner, .. } | HydroNode::Reference { inner, .. } => {
2758 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2759 *inner = SharedNode(transformed.clone());
2760 } else {
2761 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2762 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2763 let mut orig = inner.0.replace(HydroNode::Placeholder);
2764 transform(&mut orig, seen_tees);
2765 *transformed_cell.borrow_mut() = orig;
2766 *inner = SharedNode(transformed_cell);
2767 }
2768 }
2769
2770 HydroNode::Partition { inner, f, .. } => {
2771 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2772 *inner = SharedNode(transformed.clone());
2773 } else {
2774 f.transform_children(&mut transform, seen_tees);
2775 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
2776 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
2777 let mut orig = inner.0.replace(HydroNode::Placeholder);
2778 transform(&mut orig, seen_tees);
2779 *transformed_cell.borrow_mut() = orig;
2780 *inner = SharedNode(transformed_cell);
2781 }
2782 }
2783
2784 HydroNode::Cast { inner, .. }
2785 | HydroNode::ObserveNonDet { inner, .. }
2786 | HydroNode::BeginAtomic { inner, .. }
2787 | HydroNode::EndAtomic { inner, .. }
2788 | HydroNode::Batch { inner, .. }
2789 | HydroNode::YieldConcat { inner, .. }
2790 | HydroNode::UnboundSingleton { inner, .. }
2791 | HydroNode::AssertIsConsistent { inner, .. } => {
2792 transform(inner.as_mut(), seen_tees);
2793 }
2794
2795 HydroNode::Chain { first, second, .. } => {
2796 transform(first.as_mut(), seen_tees);
2797 transform(second.as_mut(), seen_tees);
2798 }
2799
2800 HydroNode::MergeOrdered { first, second, .. } => {
2801 transform(first.as_mut(), seen_tees);
2802 transform(second.as_mut(), seen_tees);
2803 }
2804
2805 HydroNode::ChainFirst { first, second, .. } => {
2806 transform(first.as_mut(), seen_tees);
2807 transform(second.as_mut(), seen_tees);
2808 }
2809
2810 HydroNode::CrossSingleton { left, right, .. }
2811 | HydroNode::CrossProduct { left, right, .. }
2812 | HydroNode::Join { left, right, .. }
2813 | HydroNode::JoinHalf { left, right, .. } => {
2814 transform(left.as_mut(), seen_tees);
2815 transform(right.as_mut(), seen_tees);
2816 }
2817
2818 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
2819 transform(pos.as_mut(), seen_tees);
2820 transform(neg.as_mut(), seen_tees);
2821 }
2822
2823 HydroNode::Map { f, input, .. } => {
2824 f.transform_children(&mut transform, seen_tees);
2825 transform(input.as_mut(), seen_tees);
2826 }
2827 HydroNode::FlatMap { f, input, .. }
2828 | HydroNode::FlatMapStreamBlocking { f, input, .. }
2829 | HydroNode::Filter { f, input, .. }
2830 | HydroNode::FilterMap { f, input, .. }
2831 | HydroNode::Inspect { f, input, .. }
2832 | HydroNode::Reduce { f, input, .. }
2833 | HydroNode::ReduceKeyed { f, input, .. } => {
2834 f.transform_children(&mut transform, seen_tees);
2835 transform(input.as_mut(), seen_tees);
2836 }
2837 HydroNode::ReduceKeyedWatermark {
2838 f,
2839 input,
2840 watermark,
2841 ..
2842 } => {
2843 f.transform_children(&mut transform, seen_tees);
2844 transform(input.as_mut(), seen_tees);
2845 transform(watermark.as_mut(), seen_tees);
2846 }
2847 HydroNode::Fold {
2848 init, acc, input, ..
2849 }
2850 | HydroNode::Scan {
2851 init, acc, input, ..
2852 }
2853 | HydroNode::ScanAsyncBlocking {
2854 init, acc, input, ..
2855 }
2856 | HydroNode::FoldKeyed {
2857 init, acc, input, ..
2858 } => {
2859 init.transform_children(&mut transform, seen_tees);
2860 acc.transform_children(&mut transform, seen_tees);
2861 transform(input.as_mut(), seen_tees);
2862 }
2863 HydroNode::ResolveFutures { input, .. }
2864 | HydroNode::ResolveFuturesBlocking { input, .. }
2865 | HydroNode::ResolveFuturesOrdered { input, .. }
2866 | HydroNode::Sort { input, .. }
2867 | HydroNode::DeferTick { input, .. }
2868 | HydroNode::Enumerate { input, .. }
2869 | HydroNode::Unique { input, .. }
2870 | HydroNode::Network { input, .. }
2871 | HydroNode::Counter { input, .. } => {
2872 transform(input.as_mut(), seen_tees);
2873 }
2874 }
2875 }
2876
2877 pub fn deep_clone(&self, seen_tees: &mut SeenSharedNodes) -> HydroNode {
2878 match self {
2879 HydroNode::Placeholder => HydroNode::Placeholder,
2880 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
2881 inner: Box::new(inner.deep_clone(seen_tees)),
2882 metadata: metadata.clone(),
2883 },
2884 HydroNode::UnboundSingleton { inner, metadata } => HydroNode::UnboundSingleton {
2885 inner: Box::new(inner.deep_clone(seen_tees)),
2886 metadata: metadata.clone(),
2887 },
2888 HydroNode::ObserveNonDet {
2889 inner,
2890 trusted,
2891 metadata,
2892 } => HydroNode::ObserveNonDet {
2893 inner: Box::new(inner.deep_clone(seen_tees)),
2894 trusted: *trusted,
2895 metadata: metadata.clone(),
2896 },
2897 HydroNode::AssertIsConsistent {
2898 inner,
2899 trusted,
2900 metadata,
2901 } => HydroNode::AssertIsConsistent {
2902 inner: Box::new(inner.deep_clone(seen_tees)),
2903 trusted: *trusted,
2904 metadata: metadata.clone(),
2905 },
2906 HydroNode::Source { source, metadata } => HydroNode::Source {
2907 source: source.clone(),
2908 metadata: metadata.clone(),
2909 },
2910 HydroNode::SingletonSource {
2911 value,
2912 first_tick_only,
2913 metadata,
2914 } => HydroNode::SingletonSource {
2915 value: value.clone(),
2916 first_tick_only: *first_tick_only,
2917 metadata: metadata.clone(),
2918 },
2919 HydroNode::CycleSource { cycle_id, metadata } => HydroNode::CycleSource {
2920 cycle_id: *cycle_id,
2921 metadata: metadata.clone(),
2922 },
2923 HydroNode::Tee { inner, metadata }
2924 | HydroNode::Reference {
2925 inner, metadata, ..
2926 } => {
2927 let cloned_inner = if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2928 SharedNode(transformed.clone())
2929 } else {
2930 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2931 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2932 let cloned = inner.0.borrow().deep_clone(seen_tees);
2933 *new_rc.borrow_mut() = cloned;
2934 SharedNode(new_rc)
2935 };
2936 if let HydroNode::Reference {
2937 kind,
2938 access_counter,
2939 ..
2940 } = self
2941 {
2942 HydroNode::Reference {
2943 inner: cloned_inner,
2944 kind: *kind,
2945 access_counter: access_counter.freeze(),
2946 metadata: metadata.clone(),
2947 }
2948 } else {
2949 HydroNode::Tee {
2950 inner: cloned_inner,
2951 metadata: metadata.clone(),
2952 }
2953 }
2954 }
2955 HydroNode::Partition {
2956 inner,
2957 f,
2958 is_true,
2959 metadata,
2960 } => {
2961 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
2962 HydroNode::Partition {
2963 inner: SharedNode(transformed.clone()),
2964 f: f.deep_clone(seen_tees),
2965 is_true: *is_true,
2966 metadata: metadata.clone(),
2967 }
2968 } else {
2969 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
2970 seen_tees.insert(inner.as_ptr(), new_rc.clone());
2971 let cloned = inner.0.borrow().deep_clone(seen_tees);
2972 *new_rc.borrow_mut() = cloned;
2973 HydroNode::Partition {
2974 inner: SharedNode(new_rc),
2975 f: f.deep_clone(seen_tees),
2976 is_true: *is_true,
2977 metadata: metadata.clone(),
2978 }
2979 }
2980 }
2981 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
2982 inner: Box::new(inner.deep_clone(seen_tees)),
2983 metadata: metadata.clone(),
2984 },
2985 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
2986 inner: Box::new(inner.deep_clone(seen_tees)),
2987 metadata: metadata.clone(),
2988 },
2989 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
2990 inner: Box::new(inner.deep_clone(seen_tees)),
2991 metadata: metadata.clone(),
2992 },
2993 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
2994 inner: Box::new(inner.deep_clone(seen_tees)),
2995 metadata: metadata.clone(),
2996 },
2997 HydroNode::Chain {
2998 first,
2999 second,
3000 metadata,
3001 } => HydroNode::Chain {
3002 first: Box::new(first.deep_clone(seen_tees)),
3003 second: Box::new(second.deep_clone(seen_tees)),
3004 metadata: metadata.clone(),
3005 },
3006 HydroNode::MergeOrdered {
3007 first,
3008 second,
3009 metadata,
3010 } => HydroNode::MergeOrdered {
3011 first: Box::new(first.deep_clone(seen_tees)),
3012 second: Box::new(second.deep_clone(seen_tees)),
3013 metadata: metadata.clone(),
3014 },
3015 HydroNode::ChainFirst {
3016 first,
3017 second,
3018 metadata,
3019 } => HydroNode::ChainFirst {
3020 first: Box::new(first.deep_clone(seen_tees)),
3021 second: Box::new(second.deep_clone(seen_tees)),
3022 metadata: metadata.clone(),
3023 },
3024 HydroNode::CrossProduct {
3025 left,
3026 right,
3027 metadata,
3028 } => HydroNode::CrossProduct {
3029 left: Box::new(left.deep_clone(seen_tees)),
3030 right: Box::new(right.deep_clone(seen_tees)),
3031 metadata: metadata.clone(),
3032 },
3033 HydroNode::CrossSingleton {
3034 left,
3035 right,
3036 metadata,
3037 } => HydroNode::CrossSingleton {
3038 left: Box::new(left.deep_clone(seen_tees)),
3039 right: Box::new(right.deep_clone(seen_tees)),
3040 metadata: metadata.clone(),
3041 },
3042 HydroNode::Join {
3043 left,
3044 right,
3045 metadata,
3046 } => HydroNode::Join {
3047 left: Box::new(left.deep_clone(seen_tees)),
3048 right: Box::new(right.deep_clone(seen_tees)),
3049 metadata: metadata.clone(),
3050 },
3051 HydroNode::JoinHalf {
3052 left,
3053 right,
3054 metadata,
3055 } => HydroNode::JoinHalf {
3056 left: Box::new(left.deep_clone(seen_tees)),
3057 right: Box::new(right.deep_clone(seen_tees)),
3058 metadata: metadata.clone(),
3059 },
3060 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
3061 pos: Box::new(pos.deep_clone(seen_tees)),
3062 neg: Box::new(neg.deep_clone(seen_tees)),
3063 metadata: metadata.clone(),
3064 },
3065 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
3066 pos: Box::new(pos.deep_clone(seen_tees)),
3067 neg: Box::new(neg.deep_clone(seen_tees)),
3068 metadata: metadata.clone(),
3069 },
3070 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
3071 input: Box::new(input.deep_clone(seen_tees)),
3072 metadata: metadata.clone(),
3073 },
3074 HydroNode::ResolveFuturesBlocking { input, metadata } => {
3075 HydroNode::ResolveFuturesBlocking {
3076 input: Box::new(input.deep_clone(seen_tees)),
3077 metadata: metadata.clone(),
3078 }
3079 }
3080 HydroNode::ResolveFuturesOrdered { input, metadata } => {
3081 HydroNode::ResolveFuturesOrdered {
3082 input: Box::new(input.deep_clone(seen_tees)),
3083 metadata: metadata.clone(),
3084 }
3085 }
3086 HydroNode::Map { f, input, metadata } => HydroNode::Map {
3087 f: f.deep_clone(seen_tees),
3088 input: Box::new(input.deep_clone(seen_tees)),
3089 metadata: metadata.clone(),
3090 },
3091 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
3092 f: f.deep_clone(seen_tees),
3093 input: Box::new(input.deep_clone(seen_tees)),
3094 metadata: metadata.clone(),
3095 },
3096 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
3097 HydroNode::FlatMapStreamBlocking {
3098 f: f.deep_clone(seen_tees),
3099 input: Box::new(input.deep_clone(seen_tees)),
3100 metadata: metadata.clone(),
3101 }
3102 }
3103 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
3104 f: f.deep_clone(seen_tees),
3105 input: Box::new(input.deep_clone(seen_tees)),
3106 metadata: metadata.clone(),
3107 },
3108 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
3109 f: f.deep_clone(seen_tees),
3110 input: Box::new(input.deep_clone(seen_tees)),
3111 metadata: metadata.clone(),
3112 },
3113 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
3114 input: Box::new(input.deep_clone(seen_tees)),
3115 metadata: metadata.clone(),
3116 },
3117 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
3118 input: Box::new(input.deep_clone(seen_tees)),
3119 metadata: metadata.clone(),
3120 },
3121 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
3122 f: f.deep_clone(seen_tees),
3123 input: Box::new(input.deep_clone(seen_tees)),
3124 metadata: metadata.clone(),
3125 },
3126 HydroNode::Unique { input, metadata } => HydroNode::Unique {
3127 input: Box::new(input.deep_clone(seen_tees)),
3128 metadata: metadata.clone(),
3129 },
3130 HydroNode::Sort { input, metadata } => HydroNode::Sort {
3131 input: Box::new(input.deep_clone(seen_tees)),
3132 metadata: metadata.clone(),
3133 },
3134 HydroNode::Fold {
3135 init,
3136 acc,
3137 input,
3138 metadata,
3139 } => HydroNode::Fold {
3140 init: init.deep_clone(seen_tees),
3141 acc: acc.deep_clone(seen_tees),
3142 input: Box::new(input.deep_clone(seen_tees)),
3143 metadata: metadata.clone(),
3144 },
3145 HydroNode::Scan {
3146 init,
3147 acc,
3148 input,
3149 metadata,
3150 } => HydroNode::Scan {
3151 init: init.deep_clone(seen_tees),
3152 acc: acc.deep_clone(seen_tees),
3153 input: Box::new(input.deep_clone(seen_tees)),
3154 metadata: metadata.clone(),
3155 },
3156 HydroNode::ScanAsyncBlocking {
3157 init,
3158 acc,
3159 input,
3160 metadata,
3161 } => HydroNode::ScanAsyncBlocking {
3162 init: init.deep_clone(seen_tees),
3163 acc: acc.deep_clone(seen_tees),
3164 input: Box::new(input.deep_clone(seen_tees)),
3165 metadata: metadata.clone(),
3166 },
3167 HydroNode::FoldKeyed {
3168 init,
3169 acc,
3170 input,
3171 metadata,
3172 } => HydroNode::FoldKeyed {
3173 init: init.deep_clone(seen_tees),
3174 acc: acc.deep_clone(seen_tees),
3175 input: Box::new(input.deep_clone(seen_tees)),
3176 metadata: metadata.clone(),
3177 },
3178 HydroNode::ReduceKeyedWatermark {
3179 f,
3180 input,
3181 watermark,
3182 metadata,
3183 } => HydroNode::ReduceKeyedWatermark {
3184 f: f.deep_clone(seen_tees),
3185 input: Box::new(input.deep_clone(seen_tees)),
3186 watermark: Box::new(watermark.deep_clone(seen_tees)),
3187 metadata: metadata.clone(),
3188 },
3189 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
3190 f: f.deep_clone(seen_tees),
3191 input: Box::new(input.deep_clone(seen_tees)),
3192 metadata: metadata.clone(),
3193 },
3194 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
3195 f: f.deep_clone(seen_tees),
3196 input: Box::new(input.deep_clone(seen_tees)),
3197 metadata: metadata.clone(),
3198 },
3199 HydroNode::Network {
3200 name,
3201 networking_info,
3202 serialize_fn,
3203 instantiate_fn,
3204 deserialize_fn,
3205 input,
3206 metadata,
3207 } => HydroNode::Network {
3208 name: name.clone(),
3209 networking_info: networking_info.clone(),
3210 serialize_fn: serialize_fn.clone(),
3211 instantiate_fn: instantiate_fn.clone(),
3212 deserialize_fn: deserialize_fn.clone(),
3213 input: Box::new(input.deep_clone(seen_tees)),
3214 metadata: metadata.clone(),
3215 },
3216 HydroNode::ExternalInput {
3217 from_external_key,
3218 from_port_id,
3219 from_many,
3220 codec_type,
3221 port_hint,
3222 instantiate_fn,
3223 deserialize_fn,
3224 metadata,
3225 } => HydroNode::ExternalInput {
3226 from_external_key: *from_external_key,
3227 from_port_id: *from_port_id,
3228 from_many: *from_many,
3229 codec_type: codec_type.clone(),
3230 port_hint: *port_hint,
3231 instantiate_fn: instantiate_fn.clone(),
3232 deserialize_fn: deserialize_fn.clone(),
3233 metadata: metadata.clone(),
3234 },
3235 HydroNode::Counter {
3236 tag,
3237 duration,
3238 prefix,
3239 input,
3240 metadata,
3241 } => HydroNode::Counter {
3242 tag: tag.clone(),
3243 duration: duration.clone(),
3244 prefix: prefix.clone(),
3245 input: Box::new(input.deep_clone(seen_tees)),
3246 metadata: metadata.clone(),
3247 },
3248 }
3249 }
3250
3251 #[cfg(feature = "build")]
3252 pub fn emit_core(
3253 &mut self,
3254 builders_or_callback: &mut BuildersOrCallback<
3255 impl FnMut(&mut HydroRoot, &mut crate::Counter<StmtId>),
3256 impl FnMut(&mut HydroNode, &mut crate::Counter<StmtId>),
3257 >,
3258 seen_tees: &mut SeenSharedNodes,
3259 built_tees: &mut HashMap<*const RefCell<HydroNode>, Vec<syn::Ident>>,
3260 next_stmt_id: &mut crate::Counter<StmtId>,
3261 fold_hooked_idents: &mut HashSet<String>,
3262 ) -> syn::Ident {
3263 let mut ident_stack: Vec<syn::Ident> = Vec::new();
3264
3265 self.transform_bottom_up(
3266 &mut |node: &mut HydroNode| {
3267 let out_location = node.metadata().location_id.clone();
3268 match node {
3269 HydroNode::Placeholder => {
3270 panic!()
3271 }
3272
3273 HydroNode::Cast { .. } => {
3274 let _ = next_stmt_id.get_and_increment();
3277 match builders_or_callback {
3278 BuildersOrCallback::Builders(_) => {}
3279 BuildersOrCallback::Callback(_, node_callback) => {
3280 node_callback(node, next_stmt_id);
3281 }
3282 }
3283 }
3285
3286 HydroNode::UnboundSingleton { .. } => {
3287 let inner_ident = ident_stack.pop().unwrap();
3288
3289 let stmt_id = next_stmt_id.get_and_increment();
3290 let out_ident =
3291 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3292
3293 match builders_or_callback {
3294 BuildersOrCallback::Builders(graph_builders) => {
3295 if graph_builders.singleton_intermediates() {
3296 let builder = graph_builders.get_dfir_mut(&out_location);
3297 builder.add_dfir(
3298 parse_quote! {
3299 #out_ident = #inner_ident;
3300 },
3301 None,
3302 None,
3303 );
3304 } else {
3305 let builder = graph_builders.get_dfir_mut(&out_location);
3306 builder.add_dfir(
3307 parse_quote! {
3308 #out_ident = #inner_ident -> persist::<'static>();
3309 },
3310 None,
3311 None,
3312 );
3313 }
3314 }
3315 BuildersOrCallback::Callback(_, node_callback) => {
3316 node_callback(node, next_stmt_id);
3317 }
3318 }
3319
3320 ident_stack.push(out_ident);
3321 }
3322
3323 HydroNode::AssertIsConsistent { inner, trusted, .. } => {
3324 let inner_ident = ident_stack.pop().unwrap();
3325
3326 let stmt_id = next_stmt_id.get_and_increment();
3327 let out_ident =
3328 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3329
3330 match builders_or_callback {
3331 BuildersOrCallback::Builders(graph_builders) => {
3332 graph_builders.assert_is_consistent(
3333 *trusted,
3334 &inner.metadata().location_id,
3335 inner_ident,
3336 &out_ident,
3337 );
3338 }
3339 BuildersOrCallback::Callback(_, node_callback) => {
3340 node_callback(node, next_stmt_id);
3341 }
3342 }
3343
3344 ident_stack.push(out_ident);
3345 }
3346
3347 HydroNode::ObserveNonDet {
3348 inner,
3349 trusted,
3350 metadata,
3351 ..
3352 } => {
3353 let inner_ident = ident_stack.pop().unwrap();
3354
3355 let stmt_id = next_stmt_id.get_and_increment();
3356 let observe_ident =
3357 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3358
3359 match builders_or_callback {
3360 BuildersOrCallback::Builders(graph_builders) => {
3361 graph_builders.observe_nondet(
3362 *trusted,
3363 &inner.metadata().location_id,
3364 inner_ident,
3365 &inner.metadata().collection_kind,
3366 &observe_ident,
3367 &metadata.collection_kind,
3368 &metadata.op,
3369 );
3370 }
3371 BuildersOrCallback::Callback(_, node_callback) => {
3372 node_callback(node, next_stmt_id);
3373 }
3374 }
3375
3376 ident_stack.push(observe_ident);
3377 }
3378
3379 HydroNode::Batch {
3380 inner, metadata, ..
3381 } => {
3382 let inner_ident = ident_stack.pop().unwrap();
3383
3384 let stmt_id = next_stmt_id.get_and_increment();
3385 let batch_ident =
3386 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3387
3388 match builders_or_callback {
3389 BuildersOrCallback::Builders(graph_builders) => {
3390 graph_builders.batch(
3391 inner_ident,
3392 &inner.metadata().location_id,
3393 &inner.metadata().collection_kind,
3394 &batch_ident,
3395 &out_location,
3396 &metadata.op,
3397 fold_hooked_idents,
3398 );
3399 }
3400 BuildersOrCallback::Callback(_, node_callback) => {
3401 node_callback(node, next_stmt_id);
3402 }
3403 }
3404
3405 ident_stack.push(batch_ident);
3406 }
3407
3408 HydroNode::YieldConcat { inner, .. } => {
3409 let inner_ident = ident_stack.pop().unwrap();
3410
3411 let stmt_id = next_stmt_id.get_and_increment();
3412 let yield_ident =
3413 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3414
3415 match builders_or_callback {
3416 BuildersOrCallback::Builders(graph_builders) => {
3417 graph_builders.yield_from_tick(
3418 inner_ident,
3419 &inner.metadata().location_id,
3420 &inner.metadata().collection_kind,
3421 &yield_ident,
3422 &out_location,
3423 );
3424 }
3425 BuildersOrCallback::Callback(_, node_callback) => {
3426 node_callback(node, next_stmt_id);
3427 }
3428 }
3429
3430 ident_stack.push(yield_ident);
3431 }
3432
3433 HydroNode::BeginAtomic { inner, metadata } => {
3434 let inner_ident = ident_stack.pop().unwrap();
3435
3436 let stmt_id = next_stmt_id.get_and_increment();
3437 let begin_ident =
3438 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3439
3440 match builders_or_callback {
3441 BuildersOrCallback::Builders(graph_builders) => {
3442 graph_builders.begin_atomic(
3443 inner_ident,
3444 &inner.metadata().location_id,
3445 &inner.metadata().collection_kind,
3446 &begin_ident,
3447 &out_location,
3448 &metadata.op,
3449 );
3450 }
3451 BuildersOrCallback::Callback(_, node_callback) => {
3452 node_callback(node, next_stmt_id);
3453 }
3454 }
3455
3456 ident_stack.push(begin_ident);
3457 }
3458
3459 HydroNode::EndAtomic { inner, .. } => {
3460 let inner_ident = ident_stack.pop().unwrap();
3461
3462 let stmt_id = next_stmt_id.get_and_increment();
3463 let end_ident =
3464 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3465
3466 match builders_or_callback {
3467 BuildersOrCallback::Builders(graph_builders) => {
3468 graph_builders.end_atomic(
3469 inner_ident,
3470 &inner.metadata().location_id,
3471 &inner.metadata().collection_kind,
3472 &end_ident,
3473 );
3474 }
3475 BuildersOrCallback::Callback(_, node_callback) => {
3476 node_callback(node, next_stmt_id);
3477 }
3478 }
3479
3480 ident_stack.push(end_ident);
3481 }
3482
3483 HydroNode::Source {
3484 source, metadata, ..
3485 } => {
3486 if let HydroSource::ExternalNetwork() = source {
3487 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
3488 } else {
3489 let stmt_id = next_stmt_id.get_and_increment();
3490 let source_ident =
3491 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3492
3493 let source_stmt = match source {
3494 HydroSource::Stream(expr) => {
3495 debug_assert!(metadata.location_id.is_top_level());
3496 parse_quote! {
3497 #source_ident = source_stream(#expr);
3498 }
3499 }
3500
3501 HydroSource::ExternalNetwork() => {
3502 unreachable!()
3503 }
3504
3505 HydroSource::Iter(expr) => {
3506 if metadata.location_id.is_top_level() {
3507 parse_quote! {
3508 #source_ident = source_iter(#expr);
3509 }
3510 } else {
3511 parse_quote! {
3513 #source_ident = source_iter(#expr) -> persist::<'static>();
3514 }
3515 }
3516 }
3517
3518 HydroSource::Spin() => {
3519 debug_assert!(metadata.location_id.is_top_level());
3520 parse_quote! {
3521 #source_ident = spin();
3522 }
3523 }
3524
3525 HydroSource::ClusterMembers(target_loc, state) => {
3526 debug_assert!(metadata.location_id.is_top_level());
3527
3528 let members_tee_ident = syn::Ident::new(
3529 &format!(
3530 "__cluster_members_tee_{}_{}",
3531 metadata.location_id.root().key(),
3532 target_loc.key(),
3533 ),
3534 Span::call_site(),
3535 );
3536
3537 match state {
3538 ClusterMembersState::Stream(d) => {
3539 parse_quote! {
3540 #members_tee_ident = source_stream(#d) -> tee();
3541 #source_ident = #members_tee_ident;
3542 }
3543 },
3544 ClusterMembersState::Uninit => syn::parse_quote! {
3545 #source_ident = source_stream(DUMMY);
3546 },
3547 ClusterMembersState::Tee(..) => parse_quote! {
3548 #source_ident = #members_tee_ident;
3549 },
3550 }
3551 }
3552
3553 HydroSource::Embedded(ident) => {
3554 parse_quote! {
3555 #source_ident = source_stream(#ident);
3556 }
3557 }
3558
3559 HydroSource::EmbeddedSingleton(ident) => {
3560 parse_quote! {
3561 #source_ident = source_iter([#ident]);
3562 }
3563 }
3564 };
3565
3566 match builders_or_callback {
3567 BuildersOrCallback::Builders(graph_builders) => {
3568 let builder = graph_builders.get_dfir_mut(&out_location);
3569 builder.add_dfir(source_stmt, None, Some(&stmt_id.to_string()));
3570 }
3571 BuildersOrCallback::Callback(_, node_callback) => {
3572 node_callback(node, next_stmt_id);
3573 }
3574 }
3575
3576 ident_stack.push(source_ident);
3577 }
3578 }
3579
3580 HydroNode::SingletonSource { value, first_tick_only, metadata } => {
3581 let stmt_id = next_stmt_id.get_and_increment();
3582 let source_ident =
3583 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3584
3585 match builders_or_callback {
3586 BuildersOrCallback::Builders(graph_builders) => {
3587 let builder = graph_builders.get_dfir_mut(&out_location);
3588
3589 if *first_tick_only {
3590 assert!(
3591 !metadata.location_id.is_top_level(),
3592 "first_tick_only SingletonSource must be inside a tick"
3593 );
3594 }
3595
3596 if *first_tick_only
3597 || (metadata.location_id.is_top_level()
3598 && metadata.collection_kind.is_bounded())
3599 {
3600 builder.add_dfir(
3601 parse_quote! {
3602 #source_ident = source_iter([#value]);
3603 },
3604 None,
3605 Some(&stmt_id.to_string()),
3606 );
3607 } else {
3608 builder.add_dfir(
3609 parse_quote! {
3610 #source_ident = source_iter([#value]) -> persist::<'static>();
3611 },
3612 None,
3613 Some(&stmt_id.to_string()),
3614 );
3615 }
3616 }
3617 BuildersOrCallback::Callback(_, node_callback) => {
3618 node_callback(node, next_stmt_id);
3619 }
3620 }
3621
3622 ident_stack.push(source_ident);
3623 }
3624
3625 HydroNode::CycleSource { cycle_id, .. } => {
3626 let ident = cycle_id.as_ident();
3627
3628 let _ = next_stmt_id.get_and_increment();
3630
3631 match builders_or_callback {
3632 BuildersOrCallback::Builders(_) => {}
3633 BuildersOrCallback::Callback(_, node_callback) => {
3634 node_callback(node, next_stmt_id);
3635 }
3636 }
3637
3638 ident_stack.push(ident);
3639 }
3640
3641 HydroNode::Tee { inner, .. } => {
3642 let stmt_id = next_stmt_id.get_and_increment();
3645
3646 let ret_ident = if let Some(built_idents) =
3647 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3648 {
3649 match builders_or_callback {
3650 BuildersOrCallback::Builders(_) => {}
3651 BuildersOrCallback::Callback(_, node_callback) => {
3652 node_callback(node, next_stmt_id);
3653 }
3654 }
3655
3656 built_idents[0].clone()
3657 } else {
3658 let inner_ident = ident_stack.pop().unwrap();
3661
3662 let tee_ident =
3663 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3664
3665 built_tees.insert(
3666 inner.0.as_ref() as *const RefCell<HydroNode>,
3667 vec![tee_ident.clone()],
3668 );
3669
3670 match builders_or_callback {
3671 BuildersOrCallback::Builders(graph_builders) => {
3672 if fold_hooked_idents.contains(&inner_ident.to_string()) {
3684 fold_hooked_idents.insert(tee_ident.to_string());
3685 }
3686 let builder = graph_builders.get_dfir_mut(&out_location);
3687 builder.add_dfir(
3688 parse_quote! {
3689 #tee_ident = #inner_ident -> tee();
3690 },
3691 None,
3692 Some(&stmt_id.to_string()),
3693 );
3694 }
3695 BuildersOrCallback::Callback(_, node_callback) => {
3696 node_callback(node, next_stmt_id);
3697 }
3698 }
3699
3700 tee_ident
3701 };
3702
3703 ident_stack.push(ret_ident);
3704 }
3705
3706 HydroNode::Reference { inner, kind, .. } => {
3707 let stmt_id = next_stmt_id.get_and_increment();
3710
3711 let ret_ident = if let Some(built_idents) =
3712 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
3713 {
3714 built_idents[0].clone()
3715 } else {
3716 let inner_ident = ident_stack.pop().unwrap();
3717
3718 let ref_ident =
3719 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3720
3721 built_tees.insert(
3722 inner.0.as_ref() as *const RefCell<HydroNode>,
3723 vec![ref_ident.clone()],
3724 );
3725
3726 match builders_or_callback {
3727 BuildersOrCallback::Builders(graph_builders) => {
3728 let builder = graph_builders.get_dfir_mut(&out_location);
3729 let op_ident = syn::Ident::new(
3730 match kind {
3731 crate::handoff_ref::HandoffRefKind::Singleton => "singleton",
3732 crate::handoff_ref::HandoffRefKind::Optional => "optional",
3733 crate::handoff_ref::HandoffRefKind::Vec => "handoff",
3734 },
3735 Span::call_site(),
3736 );
3737 builder.add_dfir(
3738 parse_quote! {
3739 #ref_ident = #inner_ident -> #op_ident();
3740 },
3741 None,
3742 Some(&stmt_id.to_string()),
3743 );
3744 }
3745 BuildersOrCallback::Callback(_, node_callback) => {
3746 node_callback(node, next_stmt_id);
3747 }
3748 }
3749
3750 ref_ident
3751 };
3752
3753 ident_stack.push(ret_ident);
3754 }
3755
3756 HydroNode::Partition {
3757 inner, f, is_true, metadata,
3758 } => {
3759 let is_true = *is_true; let ptr = inner.0.as_ref() as *const RefCell<HydroNode>;
3761 let stmt_id = next_stmt_id.get_and_increment();
3762
3763 let ret_ident = if let Some(built_idents) = built_tees.get(&ptr) {
3764 match builders_or_callback {
3765 BuildersOrCallback::Builders(_) => {}
3766 BuildersOrCallback::Callback(_, node_callback) => {
3767 node_callback(node, next_stmt_id);
3768 }
3769 }
3770
3771 let idx = if is_true { 0 } else { 1 };
3772 built_idents[idx].clone()
3773 } else {
3774 let inner_ident = ident_stack.pop().unwrap();
3777 let f_tokens = f.emit_tokens(&mut ident_stack);
3778
3779 let inner_ident = {
3780 let inner_borrow = inner.0.borrow();
3781 maybe_observe_for_mut(
3782 f, inner_ident,
3783 &inner_borrow.metadata().location_id,
3784 &inner_borrow.metadata().collection_kind,
3785 &metadata.op,
3786 builders_or_callback, next_stmt_id,
3787 )
3788 };
3789
3790 let partition_ident = syn::Ident::new(
3791 &format!("stream_{}_partition", stmt_id),
3792 Span::call_site(),
3793 );
3794 let true_ident = syn::Ident::new(
3795 &format!("stream_{}_true", stmt_id),
3796 Span::call_site(),
3797 );
3798 let false_ident = syn::Ident::new(
3799 &format!("stream_{}_false", stmt_id),
3800 Span::call_site(),
3801 );
3802
3803 built_tees.insert(
3804 ptr,
3805 vec![true_ident.clone(), false_ident.clone()],
3806 );
3807
3808 let stmt_id = next_stmt_id.get_and_increment();
3809 match builders_or_callback {
3810 BuildersOrCallback::Builders(graph_builders) => {
3811 let builder = graph_builders.get_dfir_mut(&out_location);
3812 builder.add_dfir(
3813 parse_quote! {
3814 #partition_ident = #inner_ident -> partition(|__item, __num_outputs| if (#f_tokens)(__item) { 0_usize } else { 1_usize });
3815 #true_ident = #partition_ident[0];
3816 #false_ident = #partition_ident[1];
3817 },
3818 None,
3819 Some(&stmt_id.to_string()),
3820 );
3821 }
3822 BuildersOrCallback::Callback(_, node_callback) => {
3823 node_callback(node, next_stmt_id);
3824 }
3825 }
3826
3827 if is_true { true_ident } else { false_ident }
3828 };
3829
3830 ident_stack.push(ret_ident);
3831 }
3832
3833 HydroNode::Chain { .. } => {
3834 let second_ident = ident_stack.pop().unwrap();
3836 let first_ident = ident_stack.pop().unwrap();
3837
3838 let stmt_id = next_stmt_id.get_and_increment();
3839 let chain_ident =
3840 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3841
3842 match builders_or_callback {
3843 BuildersOrCallback::Builders(graph_builders) => {
3844 let builder = graph_builders.get_dfir_mut(&out_location);
3845 builder.add_dfir(
3846 parse_quote! {
3847 #chain_ident = chain();
3848 #first_ident -> [0]#chain_ident;
3849 #second_ident -> [1]#chain_ident;
3850 },
3851 None,
3852 Some(&stmt_id.to_string()),
3853 );
3854 }
3855 BuildersOrCallback::Callback(_, node_callback) => {
3856 node_callback(node, next_stmt_id);
3857 }
3858 }
3859
3860 ident_stack.push(chain_ident);
3861 }
3862
3863 HydroNode::MergeOrdered { first, metadata, .. } => {
3864 let second_ident = ident_stack.pop().unwrap();
3865 let first_ident = ident_stack.pop().unwrap();
3866
3867 let stmt_id = next_stmt_id.get_and_increment();
3868 let merge_ident =
3869 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3870
3871 match builders_or_callback {
3872 BuildersOrCallback::Builders(graph_builders) => {
3873 graph_builders.merge_ordered(
3874 &first.metadata().location_id,
3875 first_ident,
3876 second_ident,
3877 &merge_ident,
3878 &first.metadata().collection_kind,
3879 &metadata.op,
3880 Some(&stmt_id.to_string()),
3881 );
3882 }
3883 BuildersOrCallback::Callback(_, node_callback) => {
3884 node_callback(node, next_stmt_id);
3885 }
3886 }
3887
3888 ident_stack.push(merge_ident);
3889 }
3890
3891 HydroNode::ChainFirst { .. } => {
3892 let second_ident = ident_stack.pop().unwrap();
3893 let first_ident = ident_stack.pop().unwrap();
3894
3895 let stmt_id = next_stmt_id.get_and_increment();
3896 let chain_ident =
3897 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3898
3899 match builders_or_callback {
3900 BuildersOrCallback::Builders(graph_builders) => {
3901 let builder = graph_builders.get_dfir_mut(&out_location);
3902 builder.add_dfir(
3903 parse_quote! {
3904 #chain_ident = chain_first_n(1);
3905 #first_ident -> [0]#chain_ident;
3906 #second_ident -> [1]#chain_ident;
3907 },
3908 None,
3909 Some(&stmt_id.to_string()),
3910 );
3911 }
3912 BuildersOrCallback::Callback(_, node_callback) => {
3913 node_callback(node, next_stmt_id);
3914 }
3915 }
3916
3917 ident_stack.push(chain_ident);
3918 }
3919
3920 HydroNode::CrossSingleton { right, .. } => {
3921 let right_ident = ident_stack.pop().unwrap();
3922 let left_ident = ident_stack.pop().unwrap();
3923
3924 let stmt_id = next_stmt_id.get_and_increment();
3925 let cross_ident =
3926 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3927
3928 match builders_or_callback {
3929 BuildersOrCallback::Builders(graph_builders) => {
3930 let builder = graph_builders.get_dfir_mut(&out_location);
3931
3932 if right.metadata().location_id.is_top_level()
3933 && right.metadata().collection_kind.is_bounded()
3934 {
3935 builder.add_dfir(
3936 parse_quote! {
3937 #cross_ident = cross_singleton::<'static>();
3938 #left_ident -> [input]#cross_ident;
3939 #right_ident -> [single]#cross_ident;
3940 },
3941 None,
3942 Some(&stmt_id.to_string()),
3943 );
3944 } else {
3945 builder.add_dfir(
3946 parse_quote! {
3947 #cross_ident = cross_singleton();
3948 #left_ident -> [input]#cross_ident;
3949 #right_ident -> [single]#cross_ident;
3950 },
3951 None,
3952 Some(&stmt_id.to_string()),
3953 );
3954 }
3955 }
3956 BuildersOrCallback::Callback(_, node_callback) => {
3957 node_callback(node, next_stmt_id);
3958 }
3959 }
3960
3961 ident_stack.push(cross_ident);
3962 }
3963
3964 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
3965 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
3966 parse_quote!(cross_join_multiset)
3967 } else {
3968 parse_quote!(join_multiset)
3969 };
3970
3971 let (HydroNode::CrossProduct { left, right, .. }
3972 | HydroNode::Join { left, right, .. }) = node
3973 else {
3974 unreachable!()
3975 };
3976
3977 let is_top_level = left.metadata().location_id.is_top_level()
3978 && right.metadata().location_id.is_top_level();
3979 let left_lifetime = if left.metadata().location_id.is_top_level() {
3980 quote!('static)
3981 } else {
3982 quote!('tick)
3983 };
3984
3985 let right_lifetime = if right.metadata().location_id.is_top_level() {
3986 quote!('static)
3987 } else {
3988 quote!('tick)
3989 };
3990
3991 let right_ident = ident_stack.pop().unwrap();
3992 let left_ident = ident_stack.pop().unwrap();
3993
3994 let stmt_id = next_stmt_id.get_and_increment();
3995 let stream_ident =
3996 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
3997
3998 match builders_or_callback {
3999 BuildersOrCallback::Builders(graph_builders) => {
4000 let builder = graph_builders.get_dfir_mut(&out_location);
4001 builder.add_dfir(
4002 if is_top_level {
4003 parse_quote! {
4006 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
4007 #left_ident -> [0]#stream_ident;
4008 #right_ident -> [1]#stream_ident;
4009 }
4010 } else {
4011 parse_quote! {
4012 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
4013 #left_ident -> [0]#stream_ident;
4014 #right_ident -> [1]#stream_ident;
4015 }
4016 }
4017 ,
4018 None,
4019 Some(&stmt_id.to_string()),
4020 );
4021 }
4022 BuildersOrCallback::Callback(_, node_callback) => {
4023 node_callback(node, next_stmt_id);
4024 }
4025 }
4026
4027 ident_stack.push(stream_ident);
4028 }
4029
4030 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
4031 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
4032 parse_quote!(difference)
4033 } else {
4034 parse_quote!(anti_join)
4035 };
4036
4037 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
4038 node
4039 else {
4040 unreachable!()
4041 };
4042
4043 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
4044 quote!('static)
4045 } else {
4046 quote!('tick)
4047 };
4048
4049 let neg_ident = ident_stack.pop().unwrap();
4050 let pos_ident = ident_stack.pop().unwrap();
4051
4052 let stmt_id = next_stmt_id.get_and_increment();
4053 let stream_ident =
4054 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4055
4056 match builders_or_callback {
4057 BuildersOrCallback::Builders(graph_builders) => {
4058 let builder = graph_builders.get_dfir_mut(&out_location);
4059 builder.add_dfir(
4060 parse_quote! {
4061 #stream_ident = #operator::<'tick, #neg_lifetime>();
4062 #pos_ident -> [pos]#stream_ident;
4063 #neg_ident -> [neg]#stream_ident;
4064 },
4065 None,
4066 Some(&stmt_id.to_string()),
4067 );
4068 }
4069 BuildersOrCallback::Callback(_, node_callback) => {
4070 node_callback(node, next_stmt_id);
4071 }
4072 }
4073
4074 ident_stack.push(stream_ident);
4075 }
4076
4077 HydroNode::JoinHalf { .. } => {
4078 let HydroNode::JoinHalf { right, .. } = node else {
4079 unreachable!()
4080 };
4081
4082 assert!(
4083 right.metadata().collection_kind.is_bounded(),
4084 "JoinHalf requires the right (build) side to be Bounded, got {:?}",
4085 right.metadata().collection_kind
4086 );
4087
4088 let build_lifetime = if right.metadata().location_id.is_top_level() {
4089 quote!('static)
4090 } else {
4091 quote!('tick)
4092 };
4093
4094 let build_ident = ident_stack.pop().unwrap();
4095 let probe_ident = ident_stack.pop().unwrap();
4096
4097 let stmt_id = next_stmt_id.get_and_increment();
4098 let stream_ident =
4099 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4100
4101 match builders_or_callback {
4102 BuildersOrCallback::Builders(graph_builders) => {
4103 let builder = graph_builders.get_dfir_mut(&out_location);
4104 builder.add_dfir(
4105 parse_quote! {
4106 #stream_ident = join_multiset_half::<#build_lifetime, 'tick>();
4107 #probe_ident -> [probe]#stream_ident;
4108 #build_ident -> [build]#stream_ident;
4109 },
4110 None,
4111 Some(&stmt_id.to_string()),
4112 );
4113 }
4114 BuildersOrCallback::Callback(_, node_callback) => {
4115 node_callback(node, next_stmt_id);
4116 }
4117 }
4118
4119 ident_stack.push(stream_ident);
4120 }
4121
4122 HydroNode::ResolveFutures { .. } => {
4123 let input_ident = ident_stack.pop().unwrap();
4124
4125 let stmt_id = next_stmt_id.get_and_increment();
4126 let futures_ident =
4127 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4128
4129 match builders_or_callback {
4130 BuildersOrCallback::Builders(graph_builders) => {
4131 let builder = graph_builders.get_dfir_mut(&out_location);
4132 builder.add_dfir(
4133 parse_quote! {
4134 #futures_ident = #input_ident -> resolve_futures();
4135 },
4136 None,
4137 Some(&stmt_id.to_string()),
4138 );
4139 }
4140 BuildersOrCallback::Callback(_, node_callback) => {
4141 node_callback(node, next_stmt_id);
4142 }
4143 }
4144
4145 ident_stack.push(futures_ident);
4146 }
4147
4148 HydroNode::ResolveFuturesBlocking { .. } => {
4149 let input_ident = ident_stack.pop().unwrap();
4150
4151 let stmt_id = next_stmt_id.get_and_increment();
4152 let futures_ident =
4153 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4154
4155 match builders_or_callback {
4156 BuildersOrCallback::Builders(graph_builders) => {
4157 let builder = graph_builders.get_dfir_mut(&out_location);
4158 builder.add_dfir(
4159 parse_quote! {
4160 #futures_ident = #input_ident -> resolve_futures_blocking();
4161 },
4162 None,
4163 Some(&stmt_id.to_string()),
4164 );
4165 }
4166 BuildersOrCallback::Callback(_, node_callback) => {
4167 node_callback(node, next_stmt_id);
4168 }
4169 }
4170
4171 ident_stack.push(futures_ident);
4172 }
4173
4174 HydroNode::ResolveFuturesOrdered { .. } => {
4175 let input_ident = ident_stack.pop().unwrap();
4176
4177 let stmt_id = next_stmt_id.get_and_increment();
4178 let futures_ident =
4179 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4180
4181 match builders_or_callback {
4182 BuildersOrCallback::Builders(graph_builders) => {
4183 let builder = graph_builders.get_dfir_mut(&out_location);
4184 builder.add_dfir(
4185 parse_quote! {
4186 #futures_ident = #input_ident -> resolve_futures_ordered();
4187 },
4188 None,
4189 Some(&stmt_id.to_string()),
4190 );
4191 }
4192 BuildersOrCallback::Callback(_, node_callback) => {
4193 node_callback(node, next_stmt_id);
4194 }
4195 }
4196
4197 ident_stack.push(futures_ident);
4198 }
4199
4200 HydroNode::Map {
4201 f,
4202 input,
4203 metadata,
4204 } => {
4205 let input_ident = ident_stack.pop().unwrap();
4207 let f_tokens = f.emit_tokens(&mut ident_stack);
4208
4209 let input_ident = maybe_observe_for_mut(
4210 f,
4211 input_ident,
4212 &input.metadata().location_id,
4213 &input.metadata().collection_kind,
4214 &metadata.op,
4215 builders_or_callback,
4216 next_stmt_id,
4217 );
4218
4219 let stmt_id = next_stmt_id.get_and_increment();
4220 let map_ident =
4221 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4222
4223 match builders_or_callback {
4224 BuildersOrCallback::Builders(graph_builders) => {
4225 let builder = graph_builders.get_dfir_mut(&out_location);
4226 builder.add_dfir(
4227 parse_quote! {
4228 #map_ident = #input_ident -> map(#f_tokens);
4229 },
4230 None,
4231 Some(&stmt_id.to_string()),
4232 );
4233 }
4234 BuildersOrCallback::Callback(_, node_callback) => {
4235 node_callback(node, next_stmt_id);
4236 }
4237 }
4238
4239 ident_stack.push(map_ident);
4240 }
4241
4242 HydroNode::FlatMap { f, input, metadata } => {
4243 let input_ident = ident_stack.pop().unwrap();
4244 let f_tokens = f.emit_tokens(&mut ident_stack);
4245
4246 let input_ident = maybe_observe_for_mut(
4247 f, input_ident,
4248 &input.metadata().location_id,
4249 &input.metadata().collection_kind,
4250 &metadata.op,
4251 builders_or_callback, next_stmt_id,
4252 );
4253
4254 let stmt_id = next_stmt_id.get_and_increment();
4255 let flat_map_ident =
4256 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4257
4258 match builders_or_callback {
4259 BuildersOrCallback::Builders(graph_builders) => {
4260 let builder = graph_builders.get_dfir_mut(&out_location);
4261 builder.add_dfir(
4262 parse_quote! {
4263 #flat_map_ident = #input_ident -> flat_map(#f_tokens);
4264 },
4265 None,
4266 Some(&stmt_id.to_string()),
4267 );
4268 }
4269 BuildersOrCallback::Callback(_, node_callback) => {
4270 node_callback(node, next_stmt_id);
4271 }
4272 }
4273
4274 ident_stack.push(flat_map_ident);
4275 }
4276
4277 HydroNode::FlatMapStreamBlocking { f, input, metadata } => {
4278 let input_ident = ident_stack.pop().unwrap();
4279 let f_tokens = f.emit_tokens(&mut ident_stack);
4280
4281 let input_ident = maybe_observe_for_mut(
4282 f, input_ident,
4283 &input.metadata().location_id,
4284 &input.metadata().collection_kind,
4285 &metadata.op,
4286 builders_or_callback, next_stmt_id,
4287 );
4288
4289 let stmt_id = next_stmt_id.get_and_increment();
4290 let flat_map_stream_blocking_ident =
4291 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4292
4293 match builders_or_callback {
4294 BuildersOrCallback::Builders(graph_builders) => {
4295 let builder = graph_builders.get_dfir_mut(&out_location);
4296 builder.add_dfir(
4297 parse_quote! {
4298 #flat_map_stream_blocking_ident = #input_ident -> flat_map_stream_blocking(#f_tokens);
4299 },
4300 None,
4301 Some(&stmt_id.to_string()),
4302 );
4303 }
4304 BuildersOrCallback::Callback(_, node_callback) => {
4305 node_callback(node, next_stmt_id);
4306 }
4307 }
4308
4309 ident_stack.push(flat_map_stream_blocking_ident);
4310 }
4311
4312 HydroNode::Filter { f, input, metadata } => {
4313 let input_ident = ident_stack.pop().unwrap();
4314 let f_tokens = f.emit_tokens(&mut ident_stack);
4315
4316 let input_ident = maybe_observe_for_mut(
4317 f, input_ident,
4318 &input.metadata().location_id,
4319 &input.metadata().collection_kind,
4320 &metadata.op,
4321 builders_or_callback, next_stmt_id,
4322 );
4323
4324 let stmt_id = next_stmt_id.get_and_increment();
4325 let filter_ident =
4326 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4327
4328 match builders_or_callback {
4329 BuildersOrCallback::Builders(graph_builders) => {
4330 let builder = graph_builders.get_dfir_mut(&out_location);
4331 builder.add_dfir(
4332 parse_quote! {
4333 #filter_ident = #input_ident -> filter(#f_tokens);
4334 },
4335 None,
4336 Some(&stmt_id.to_string()),
4337 );
4338 }
4339 BuildersOrCallback::Callback(_, node_callback) => {
4340 node_callback(node, next_stmt_id);
4341 }
4342 }
4343
4344 ident_stack.push(filter_ident);
4345 }
4346
4347 HydroNode::FilterMap { f, input, metadata } => {
4348 let input_ident = ident_stack.pop().unwrap();
4349 let f_tokens = f.emit_tokens(&mut ident_stack);
4350
4351 let input_ident = maybe_observe_for_mut(
4352 f, input_ident,
4353 &input.metadata().location_id,
4354 &input.metadata().collection_kind,
4355 &metadata.op,
4356 builders_or_callback, next_stmt_id,
4357 );
4358
4359 let stmt_id = next_stmt_id.get_and_increment();
4360 let filter_map_ident =
4361 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4362
4363 match builders_or_callback {
4364 BuildersOrCallback::Builders(graph_builders) => {
4365 let builder = graph_builders.get_dfir_mut(&out_location);
4366 builder.add_dfir(
4367 parse_quote! {
4368 #filter_map_ident = #input_ident -> filter_map(#f_tokens);
4369 },
4370 None,
4371 Some(&stmt_id.to_string()),
4372 );
4373 }
4374 BuildersOrCallback::Callback(_, node_callback) => {
4375 node_callback(node, next_stmt_id);
4376 }
4377 }
4378
4379 ident_stack.push(filter_map_ident);
4380 }
4381
4382 HydroNode::Sort { .. } => {
4383 let input_ident = ident_stack.pop().unwrap();
4384
4385 let stmt_id = next_stmt_id.get_and_increment();
4386 let sort_ident =
4387 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4388
4389 match builders_or_callback {
4390 BuildersOrCallback::Builders(graph_builders) => {
4391 let builder = graph_builders.get_dfir_mut(&out_location);
4392 builder.add_dfir(
4393 parse_quote! {
4394 #sort_ident = #input_ident -> sort();
4395 },
4396 None,
4397 Some(&stmt_id.to_string()),
4398 );
4399 }
4400 BuildersOrCallback::Callback(_, node_callback) => {
4401 node_callback(node, next_stmt_id);
4402 }
4403 }
4404
4405 ident_stack.push(sort_ident);
4406 }
4407
4408 HydroNode::DeferTick { .. } => {
4409 let input_ident = ident_stack.pop().unwrap();
4410
4411 let stmt_id = next_stmt_id.get_and_increment();
4412 let defer_tick_ident =
4413 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4414
4415 match builders_or_callback {
4416 BuildersOrCallback::Builders(graph_builders) => {
4417 let builder = graph_builders.get_dfir_mut(&out_location);
4418 builder.add_dfir(
4419 parse_quote! {
4420 #defer_tick_ident = #input_ident -> defer_tick_lazy();
4421 },
4422 None,
4423 Some(&stmt_id.to_string()),
4424 );
4425 }
4426 BuildersOrCallback::Callback(_, node_callback) => {
4427 node_callback(node, next_stmt_id);
4428 }
4429 }
4430
4431 ident_stack.push(defer_tick_ident);
4432 }
4433
4434 HydroNode::Enumerate { input, .. } => {
4435 let input_ident = ident_stack.pop().unwrap();
4436
4437 let stmt_id = next_stmt_id.get_and_increment();
4438 let enumerate_ident =
4439 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4440
4441 match builders_or_callback {
4442 BuildersOrCallback::Builders(graph_builders) => {
4443 let builder = graph_builders.get_dfir_mut(&out_location);
4444 let lifetime = if input.metadata().location_id.is_top_level() {
4445 quote!('static)
4446 } else {
4447 quote!('tick)
4448 };
4449 builder.add_dfir(
4450 parse_quote! {
4451 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
4452 },
4453 None,
4454 Some(&stmt_id.to_string()),
4455 );
4456 }
4457 BuildersOrCallback::Callback(_, node_callback) => {
4458 node_callback(node, next_stmt_id);
4459 }
4460 }
4461
4462 ident_stack.push(enumerate_ident);
4463 }
4464
4465 HydroNode::Inspect { f, input, metadata } => {
4466 let input_ident = ident_stack.pop().unwrap();
4467 let f_tokens = f.emit_tokens(&mut ident_stack);
4468
4469 let input_ident = maybe_observe_for_mut(
4470 f, input_ident,
4471 &input.metadata().location_id,
4472 &input.metadata().collection_kind,
4473 &metadata.op,
4474 builders_or_callback, next_stmt_id,
4475 );
4476
4477 let stmt_id = next_stmt_id.get_and_increment();
4478 let inspect_ident =
4479 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4480
4481 match builders_or_callback {
4482 BuildersOrCallback::Builders(graph_builders) => {
4483 let builder = graph_builders.get_dfir_mut(&out_location);
4484 builder.add_dfir(
4485 parse_quote! {
4486 #inspect_ident = #input_ident -> inspect(#f_tokens);
4487 },
4488 None,
4489 Some(&stmt_id.to_string()),
4490 );
4491 }
4492 BuildersOrCallback::Callback(_, node_callback) => {
4493 node_callback(node, next_stmt_id);
4494 }
4495 }
4496
4497 ident_stack.push(inspect_ident);
4498 }
4499
4500 HydroNode::Unique { input, .. } => {
4501 let input_ident = ident_stack.pop().unwrap();
4502
4503 let stmt_id = next_stmt_id.get_and_increment();
4504 let unique_ident =
4505 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4506
4507 match builders_or_callback {
4508 BuildersOrCallback::Builders(graph_builders) => {
4509 let builder = graph_builders.get_dfir_mut(&out_location);
4510 let lifetime = if input.metadata().location_id.is_top_level() {
4511 quote!('static)
4512 } else {
4513 quote!('tick)
4514 };
4515
4516 builder.add_dfir(
4517 parse_quote! {
4518 #unique_ident = #input_ident -> unique::<#lifetime>();
4519 },
4520 None,
4521 Some(&stmt_id.to_string()),
4522 );
4523 }
4524 BuildersOrCallback::Callback(_, node_callback) => {
4525 node_callback(node, next_stmt_id);
4526 }
4527 }
4528
4529 ident_stack.push(unique_ident);
4530 }
4531
4532 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } | HydroNode::ScanAsyncBlocking { .. } => {
4533 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
4534 if input.metadata().location_id.is_top_level()
4535 && input.metadata().collection_kind.is_bounded()
4536 {
4537 parse_quote!(fold_no_replay)
4538 } else {
4539 parse_quote!(fold)
4540 }
4541 } else if matches!(node, HydroNode::Scan { .. }) {
4542 parse_quote!(scan)
4543 } else if matches!(node, HydroNode::ScanAsyncBlocking { .. }) {
4544 parse_quote!(scan_async_blocking)
4545 } else if let HydroNode::FoldKeyed { input, .. } = node {
4546 if input.metadata().location_id.is_top_level()
4547 && input.metadata().collection_kind.is_bounded()
4548 {
4549 todo!("Fold keyed on a top-level bounded collection is not yet supported")
4550 } else {
4551 parse_quote!(fold_keyed)
4552 }
4553 } else {
4554 unreachable!()
4555 };
4556
4557 let (HydroNode::Fold { input, .. }
4558 | HydroNode::FoldKeyed { input, .. }
4559 | HydroNode::Scan { input, .. }
4560 | HydroNode::ScanAsyncBlocking { input, .. }) = node
4561 else {
4562 unreachable!()
4563 };
4564
4565 let lifetime = if input.metadata().location_id.is_top_level() {
4566 quote!('static)
4567 } else {
4568 quote!('tick)
4569 };
4570
4571 let input_ident = ident_stack.pop().unwrap();
4572
4573 let (HydroNode::Fold { init, acc, .. }
4574 | HydroNode::FoldKeyed { init, acc, .. }
4575 | HydroNode::Scan { init, acc, .. }
4576 | HydroNode::ScanAsyncBlocking { init, acc, .. }) = &*node
4577 else {
4578 unreachable!()
4579 };
4580
4581 let acc_tokens = acc.emit_tokens(&mut ident_stack);
4582 let init_tokens = init.emit_tokens(&mut ident_stack);
4583
4584 let stmt_id = next_stmt_id.get_and_increment();
4585 let fold_ident =
4586 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4587
4588 match builders_or_callback {
4589 BuildersOrCallback::Builders(graph_builders) => {
4590 if matches!(node, HydroNode::Fold { .. })
4591 && node.metadata().location_id.is_top_level()
4592 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4593 && graph_builders.singleton_intermediates()
4594 && !node.metadata().collection_kind.is_bounded()
4595 {
4596 let HydroNode::Fold { input, .. } = &*node else { unreachable!() };
4597 let hooked_input_ident = graph_builders.emit_fold_hook(
4598 &input.metadata().location_id,
4599 &input_ident,
4600 &input.metadata().collection_kind,
4601 &node.metadata().op,
4602 );
4603
4604 let (effective_input, wrapped_acc) = if let Some(ref hooked) = hooked_input_ident {
4605 let acc: syn::Expr = parse_quote!({
4606 let mut __inner = #acc_tokens;
4607 move |__state, __batch: Vec<_>| {
4608 if __batch.is_empty() {
4609 return None;
4610 }
4611 for __value in __batch {
4612 __inner(__state, __value);
4613 }
4614 Some(__state.clone())
4615 }
4616 });
4617 (hooked, acc)
4618 } else {
4619 let acc: syn::Expr = parse_quote!({
4620 let mut __inner = #acc_tokens;
4621 move |__state, __value| {
4622 __inner(__state, __value);
4623 Some(__state.clone())
4624 }
4625 });
4626 (&input_ident, acc)
4627 };
4628
4629 let builder = graph_builders.get_dfir_mut(&out_location);
4630 builder.add_dfir(
4631 parse_quote! {
4632 source_iter([(#init_tokens)()]) -> [0]#fold_ident;
4633 #effective_input -> scan::<#lifetime>(#init_tokens, #wrapped_acc) -> [1]#fold_ident;
4634 #fold_ident = chain();
4635 },
4636 None,
4637 Some(&stmt_id.to_string()),
4638 );
4639
4640 if hooked_input_ident.is_some() {
4641 fold_hooked_idents.insert(fold_ident.to_string());
4642 }
4643 } else if matches!(node, HydroNode::FoldKeyed { .. })
4644 && node.metadata().location_id.is_top_level()
4645 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4646 && graph_builders.singleton_intermediates()
4647 && !node.metadata().collection_kind.is_bounded()
4648 {
4649 let HydroNode::FoldKeyed { input, .. } = &*node else { unreachable!() };
4650 let hooked_input_ident = graph_builders.emit_fold_hook(
4651 &input.metadata().location_id,
4652 &input_ident,
4653 &input.metadata().collection_kind,
4654 &node.metadata().op,
4655 );
4656 let builder = graph_builders.get_dfir_mut(&out_location);
4657
4658 let wrapped_acc: syn::Expr = parse_quote!({
4659 let mut __init = #init_tokens;
4660 let mut __inner = #acc_tokens;
4661 move |__state, __kv: (_, _)| {
4662 let __state = __state
4664 .entry(::std::clone::Clone::clone(&__kv.0))
4665 .or_insert_with(|| (__init)());
4666 __inner(__state, __kv.1);
4667 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
4668 }
4669 });
4670
4671 if let Some(hooked_input_ident) = hooked_input_ident {
4672 builder.add_dfir(
4673 parse_quote! {
4674 #fold_ident = #hooked_input_ident -> flatten() -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4675 },
4676 None,
4677 Some(&stmt_id.to_string()),
4678 );
4679
4680 fold_hooked_idents.insert(fold_ident.to_string());
4681 } else {
4682 builder.add_dfir(
4683 parse_quote! {
4684 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #wrapped_acc);
4685 },
4686 None,
4687 Some(&stmt_id.to_string()),
4688 );
4689 }
4690 } else if (matches!(node, HydroNode::Fold { .. })
4691 || matches!(node, HydroNode::FoldKeyed { .. }))
4692 && !node.metadata().location_id.is_top_level()
4693 && graph_builders.singleton_intermediates()
4694 {
4695 let input_ref = match &*node {
4696 HydroNode::Fold { input, .. } => input,
4697 HydroNode::FoldKeyed { input, .. } => input,
4698 _ => unreachable!(),
4699 };
4700 let hooked_input_ident = graph_builders.emit_fold_hook(
4701 &input_ref.metadata().location_id,
4702 &input_ident,
4703 &input_ref.metadata().collection_kind,
4704 &node.metadata().op,
4705 );
4706
4707 let actual_input = hooked_input_ident.as_ref().unwrap_or(&input_ident);
4708 let builder = graph_builders.get_dfir_mut(&out_location);
4709 builder.add_dfir(
4710 parse_quote! {
4711 #fold_ident = #actual_input -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4712 },
4713 None,
4714 Some(&stmt_id.to_string()),
4715 );
4716 } else {
4717 let builder = graph_builders.get_dfir_mut(&out_location);
4718 builder.add_dfir(
4719 parse_quote! {
4720 #fold_ident = #input_ident -> #operator::<#lifetime>(#init_tokens, #acc_tokens);
4721 },
4722 None,
4723 Some(&stmt_id.to_string()),
4724 );
4725 }
4726 }
4727 BuildersOrCallback::Callback(_, node_callback) => {
4728 node_callback(node, next_stmt_id);
4729 }
4730 }
4731
4732 ident_stack.push(fold_ident);
4733 }
4734
4735 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
4736 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
4737 if input.metadata().location_id.is_top_level()
4738 && input.metadata().collection_kind.is_bounded()
4739 {
4740 parse_quote!(reduce_no_replay)
4741 } else {
4742 parse_quote!(reduce)
4743 }
4744 } else if let HydroNode::ReduceKeyed { input, .. } = node {
4745 if input.metadata().location_id.is_top_level()
4746 && input.metadata().collection_kind.is_bounded()
4747 {
4748 todo!(
4749 "Calling keyed reduce on a top-level bounded collection is not supported"
4750 )
4751 } else {
4752 parse_quote!(reduce_keyed)
4753 }
4754 } else {
4755 unreachable!()
4756 };
4757
4758 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
4759 else {
4760 unreachable!()
4761 };
4762
4763 let lifetime = if input.metadata().location_id.is_top_level() {
4764 quote!('static)
4765 } else {
4766 quote!('tick)
4767 };
4768
4769 let input_ident = ident_stack.pop().unwrap();
4770
4771 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
4772 else {
4773 unreachable!()
4774 };
4775
4776 let f_tokens = f.emit_tokens(&mut ident_stack);
4777
4778 let stmt_id = next_stmt_id.get_and_increment();
4779 let reduce_ident =
4780 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4781
4782 match builders_or_callback {
4783 BuildersOrCallback::Builders(graph_builders) => {
4784 if matches!(node, HydroNode::Reduce { .. })
4785 && node.metadata().location_id.is_top_level()
4786 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4787 && graph_builders.singleton_intermediates()
4788 && !node.metadata().collection_kind.is_bounded()
4789 {
4790 todo!(
4791 "Reduce with optional intermediates is not yet supported in simulator"
4792 );
4793 } else if matches!(node, HydroNode::ReduceKeyed { .. })
4794 && node.metadata().location_id.is_top_level()
4795 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
4796 && graph_builders.singleton_intermediates()
4797 && !node.metadata().collection_kind.is_bounded()
4798 {
4799 todo!(
4800 "Reduce keyed with optional intermediates is not yet supported in simulator"
4801 );
4802 } else {
4803 let builder = graph_builders.get_dfir_mut(&out_location);
4804 builder.add_dfir(
4805 parse_quote! {
4806 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f_tokens);
4807 },
4808 None,
4809 Some(&stmt_id.to_string()),
4810 );
4811 }
4812 }
4813 BuildersOrCallback::Callback(_, node_callback) => {
4814 node_callback(node, next_stmt_id);
4815 }
4816 }
4817
4818 ident_stack.push(reduce_ident);
4819 }
4820
4821 HydroNode::ReduceKeyedWatermark {
4822 f,
4823 input,
4824 metadata,
4825 ..
4826 } => {
4827 let lifetime = if input.metadata().location_id.is_top_level() {
4828 quote!('static)
4829 } else {
4830 quote!('tick)
4831 };
4832
4833 let watermark_ident = ident_stack.pop().unwrap();
4835 let input_ident = ident_stack.pop().unwrap();
4836 let f_tokens = f.emit_tokens(&mut ident_stack);
4837
4838 let stmt_id = next_stmt_id.get_and_increment();
4839 let chain_ident = syn::Ident::new(
4840 &format!("reduce_keyed_watermark_chain_{}", stmt_id),
4841 Span::call_site(),
4842 );
4843
4844 let fold_ident =
4845 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4846
4847 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
4848 && input.metadata().collection_kind.is_bounded()
4849 {
4850 parse_quote!(fold_no_replay)
4851 } else {
4852 parse_quote!(fold)
4853 };
4854
4855 match builders_or_callback {
4856 BuildersOrCallback::Builders(graph_builders) => {
4857 if metadata.location_id.is_top_level()
4858 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
4859 && graph_builders.singleton_intermediates()
4860 && !metadata.collection_kind.is_bounded()
4861 {
4862 todo!(
4863 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
4864 )
4865 } else {
4866 let builder = graph_builders.get_dfir_mut(&out_location);
4867 builder.add_dfir(
4868 parse_quote! {
4869 #chain_ident = chain();
4870 #input_ident
4871 -> map(|x| (Some(x), None))
4872 -> [0]#chain_ident;
4873 #watermark_ident
4874 -> map(|watermark| (None, Some(watermark)))
4875 -> [1]#chain_ident;
4876
4877 #fold_ident = #chain_ident
4878 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
4879 let __reduce_keyed_fn = #f_tokens;
4880 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
4881 if let Some((k, v)) = opt_payload {
4882 if let Some(curr_watermark) = *opt_curr_watermark {
4883 if k < curr_watermark {
4884 return;
4885 }
4886 }
4887 match map.entry(k) {
4888 ::std::collections::hash_map::Entry::Vacant(e) => {
4889 e.insert(v);
4890 }
4891 ::std::collections::hash_map::Entry::Occupied(mut e) => {
4892 __reduce_keyed_fn(e.get_mut(), v);
4893 }
4894 }
4895 } else {
4896 let watermark = opt_watermark.unwrap();
4897 if let Some(curr_watermark) = *opt_curr_watermark {
4898 if watermark <= curr_watermark {
4899 return;
4900 }
4901 }
4902 map.retain(|k, _| *k >= watermark);
4903 *opt_curr_watermark = Some(watermark);
4904 }
4905 }
4906 })
4907 -> flat_map(|(map, _curr_watermark)| map);
4908 },
4909 None,
4910 Some(&stmt_id.to_string()),
4911 );
4912 }
4913 }
4914 BuildersOrCallback::Callback(_, node_callback) => {
4915 node_callback(node, next_stmt_id);
4916 }
4917 }
4918
4919 ident_stack.push(fold_ident);
4920 }
4921
4922 HydroNode::Network {
4923 networking_info,
4924 serialize_fn: serialize_pipeline,
4925 instantiate_fn,
4926 deserialize_fn: deserialize_pipeline,
4927 input,
4928 ..
4929 } => {
4930 let input_ident = ident_stack.pop().unwrap();
4931
4932 let stmt_id = next_stmt_id.get_and_increment();
4933 let receiver_stream_ident =
4934 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4935
4936 match builders_or_callback {
4937 BuildersOrCallback::Builders(graph_builders) => {
4938 let (sink_expr, source_expr) = match instantiate_fn {
4939 DebugInstantiate::Building => (
4940 syn::parse_quote!(DUMMY_SINK),
4941 syn::parse_quote!(DUMMY_SOURCE),
4942 ),
4943
4944 DebugInstantiate::Finalized(finalized) => {
4945 (finalized.sink.clone(), finalized.source.clone())
4946 }
4947 };
4948
4949 graph_builders.create_network(
4950 &input.metadata().location_id,
4951 &out_location,
4952 input_ident,
4953 &receiver_stream_ident,
4954 serialize_pipeline.as_ref(),
4955 sink_expr,
4956 source_expr,
4957 deserialize_pipeline.as_ref(),
4958 stmt_id,
4959 networking_info,
4960 );
4961 }
4962 BuildersOrCallback::Callback(_, node_callback) => {
4963 node_callback(node, next_stmt_id);
4964 }
4965 }
4966
4967 ident_stack.push(receiver_stream_ident);
4968 }
4969
4970 HydroNode::ExternalInput {
4971 instantiate_fn,
4972 deserialize_fn: deserialize_pipeline,
4973 ..
4974 } => {
4975 let stmt_id = next_stmt_id.get_and_increment();
4976 let receiver_stream_ident =
4977 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
4978
4979 match builders_or_callback {
4980 BuildersOrCallback::Builders(graph_builders) => {
4981 let (_, source_expr) = match instantiate_fn {
4982 DebugInstantiate::Building => (
4983 syn::parse_quote!(DUMMY_SINK),
4984 syn::parse_quote!(DUMMY_SOURCE),
4985 ),
4986
4987 DebugInstantiate::Finalized(finalized) => {
4988 (finalized.sink.clone(), finalized.source.clone())
4989 }
4990 };
4991
4992 graph_builders.create_external_source(
4993 &out_location,
4994 source_expr,
4995 &receiver_stream_ident,
4996 deserialize_pipeline.as_ref(),
4997 stmt_id,
4998 );
4999 }
5000 BuildersOrCallback::Callback(_, node_callback) => {
5001 node_callback(node, next_stmt_id);
5002 }
5003 }
5004
5005 ident_stack.push(receiver_stream_ident);
5006 }
5007
5008 HydroNode::Counter {
5009 tag,
5010 duration,
5011 prefix,
5012 ..
5013 } => {
5014 let input_ident = ident_stack.pop().unwrap();
5015
5016 let stmt_id = next_stmt_id.get_and_increment();
5017 let counter_ident =
5018 syn::Ident::new(&format!("stream_{}", stmt_id), Span::call_site());
5019
5020 match builders_or_callback {
5021 BuildersOrCallback::Builders(graph_builders) => {
5022 let arg = format!("{}({})", prefix, tag);
5023 let builder = graph_builders.get_dfir_mut(&out_location);
5024 builder.add_dfir(
5025 parse_quote! {
5026 #counter_ident = #input_ident -> _counter(#arg, #duration);
5027 },
5028 None,
5029 Some(&stmt_id.to_string()),
5030 );
5031 }
5032 BuildersOrCallback::Callback(_, node_callback) => {
5033 node_callback(node, next_stmt_id);
5034 }
5035 }
5036
5037 ident_stack.push(counter_ident);
5038 }
5039 }
5040 },
5041 seen_tees,
5042 false,
5043 );
5044
5045 let ret = ident_stack
5046 .pop()
5047 .expect("ident_stack should have exactly one element after traversal");
5048 assert!(
5049 ident_stack.is_empty(),
5050 "ident_stack should be empty after popping the final ident, but has {} remaining element(s). \
5051 This indicates a bug in the code gen: some node pushed idents that were never consumed.",
5052 ident_stack.len()
5053 );
5054 ret
5055 }
5056
5057 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
5058 match self {
5059 HydroNode::Placeholder => {
5060 panic!()
5061 }
5062 HydroNode::Cast { .. }
5063 | HydroNode::ObserveNonDet { .. }
5064 | HydroNode::UnboundSingleton { .. }
5065 | HydroNode::AssertIsConsistent { .. } => {}
5066 HydroNode::Source { source, .. } => match source {
5067 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
5068 HydroSource::ExternalNetwork()
5069 | HydroSource::Spin()
5070 | HydroSource::ClusterMembers(_, _)
5071 | HydroSource::Embedded(_)
5072 | HydroSource::EmbeddedSingleton(_) => {} },
5074 HydroNode::SingletonSource { value, .. } => {
5075 transform(value);
5076 }
5077 HydroNode::CycleSource { .. }
5078 | HydroNode::Tee { .. }
5079 | HydroNode::Reference { .. }
5080 | HydroNode::YieldConcat { .. }
5081 | HydroNode::BeginAtomic { .. }
5082 | HydroNode::EndAtomic { .. }
5083 | HydroNode::Batch { .. }
5084 | HydroNode::Chain { .. }
5085 | HydroNode::MergeOrdered { .. }
5086 | HydroNode::ChainFirst { .. }
5087 | HydroNode::CrossProduct { .. }
5088 | HydroNode::CrossSingleton { .. }
5089 | HydroNode::ResolveFutures { .. }
5090 | HydroNode::ResolveFuturesBlocking { .. }
5091 | HydroNode::ResolveFuturesOrdered { .. }
5092 | HydroNode::Join { .. }
5093 | HydroNode::JoinHalf { .. }
5094 | HydroNode::Difference { .. }
5095 | HydroNode::AntiJoin { .. }
5096 | HydroNode::DeferTick { .. }
5097 | HydroNode::Enumerate { .. }
5098 | HydroNode::Unique { .. }
5099 | HydroNode::Sort { .. } => {}
5100 HydroNode::Map { f, .. }
5101 | HydroNode::FlatMap { f, .. }
5102 | HydroNode::FlatMapStreamBlocking { f, .. }
5103 | HydroNode::Filter { f, .. }
5104 | HydroNode::FilterMap { f, .. }
5105 | HydroNode::Inspect { f, .. }
5106 | HydroNode::Partition { f, .. }
5107 | HydroNode::Reduce { f, .. }
5108 | HydroNode::ReduceKeyed { f, .. }
5109 | HydroNode::ReduceKeyedWatermark { f, .. } => {
5110 transform(&mut f.expr);
5111 }
5112 HydroNode::Fold { init, acc, .. }
5113 | HydroNode::Scan { init, acc, .. }
5114 | HydroNode::ScanAsyncBlocking { init, acc, .. }
5115 | HydroNode::FoldKeyed { init, acc, .. } => {
5116 transform(&mut init.expr);
5117 transform(&mut acc.expr);
5118 }
5119 HydroNode::Network {
5120 serialize_fn,
5121 deserialize_fn,
5122 ..
5123 } => {
5124 if let Some(serialize_fn) = serialize_fn {
5125 transform(serialize_fn);
5126 }
5127 if let Some(deserialize_fn) = deserialize_fn {
5128 transform(deserialize_fn);
5129 }
5130 }
5131 HydroNode::ExternalInput { deserialize_fn, .. } => {
5132 if let Some(deserialize_fn) = deserialize_fn {
5133 transform(deserialize_fn);
5134 }
5135 }
5136 HydroNode::Counter { duration, .. } => {
5137 transform(duration);
5138 }
5139 }
5140 }
5141
5142 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
5143 &self.metadata().op
5144 }
5145
5146 pub fn metadata(&self) -> &HydroIrMetadata {
5147 match self {
5148 HydroNode::Placeholder => {
5149 panic!()
5150 }
5151 HydroNode::Cast { metadata, .. }
5152 | HydroNode::ObserveNonDet { metadata, .. }
5153 | HydroNode::AssertIsConsistent { metadata, .. }
5154 | HydroNode::UnboundSingleton { metadata, .. }
5155 | HydroNode::Source { metadata, .. }
5156 | HydroNode::SingletonSource { metadata, .. }
5157 | HydroNode::CycleSource { metadata, .. }
5158 | HydroNode::Tee { metadata, .. }
5159 | HydroNode::Reference { metadata, .. }
5160 | HydroNode::Partition { metadata, .. }
5161 | HydroNode::YieldConcat { metadata, .. }
5162 | HydroNode::BeginAtomic { metadata, .. }
5163 | HydroNode::EndAtomic { metadata, .. }
5164 | HydroNode::Batch { metadata, .. }
5165 | HydroNode::Chain { metadata, .. }
5166 | HydroNode::MergeOrdered { metadata, .. }
5167 | HydroNode::ChainFirst { metadata, .. }
5168 | HydroNode::CrossProduct { metadata, .. }
5169 | HydroNode::CrossSingleton { metadata, .. }
5170 | HydroNode::Join { metadata, .. }
5171 | HydroNode::JoinHalf { metadata, .. }
5172 | HydroNode::Difference { metadata, .. }
5173 | HydroNode::AntiJoin { metadata, .. }
5174 | HydroNode::ResolveFutures { metadata, .. }
5175 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5176 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5177 | HydroNode::Map { metadata, .. }
5178 | HydroNode::FlatMap { metadata, .. }
5179 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5180 | HydroNode::Filter { metadata, .. }
5181 | HydroNode::FilterMap { metadata, .. }
5182 | HydroNode::DeferTick { metadata, .. }
5183 | HydroNode::Enumerate { metadata, .. }
5184 | HydroNode::Inspect { metadata, .. }
5185 | HydroNode::Unique { metadata, .. }
5186 | HydroNode::Sort { metadata, .. }
5187 | HydroNode::Scan { metadata, .. }
5188 | HydroNode::ScanAsyncBlocking { metadata, .. }
5189 | HydroNode::Fold { metadata, .. }
5190 | HydroNode::FoldKeyed { metadata, .. }
5191 | HydroNode::Reduce { metadata, .. }
5192 | HydroNode::ReduceKeyed { metadata, .. }
5193 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5194 | HydroNode::ExternalInput { metadata, .. }
5195 | HydroNode::Network { metadata, .. }
5196 | HydroNode::Counter { metadata, .. } => metadata,
5197 }
5198 }
5199
5200 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
5201 &mut self.metadata_mut().op
5202 }
5203
5204 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
5205 match self {
5206 HydroNode::Placeholder => {
5207 panic!()
5208 }
5209 HydroNode::Cast { metadata, .. }
5210 | HydroNode::ObserveNonDet { metadata, .. }
5211 | HydroNode::AssertIsConsistent { metadata, .. }
5212 | HydroNode::UnboundSingleton { metadata, .. }
5213 | HydroNode::Source { metadata, .. }
5214 | HydroNode::SingletonSource { metadata, .. }
5215 | HydroNode::CycleSource { metadata, .. }
5216 | HydroNode::Tee { metadata, .. }
5217 | HydroNode::Reference { metadata, .. }
5218 | HydroNode::Partition { metadata, .. }
5219 | HydroNode::YieldConcat { metadata, .. }
5220 | HydroNode::BeginAtomic { metadata, .. }
5221 | HydroNode::EndAtomic { metadata, .. }
5222 | HydroNode::Batch { metadata, .. }
5223 | HydroNode::Chain { metadata, .. }
5224 | HydroNode::MergeOrdered { metadata, .. }
5225 | HydroNode::ChainFirst { metadata, .. }
5226 | HydroNode::CrossProduct { metadata, .. }
5227 | HydroNode::CrossSingleton { metadata, .. }
5228 | HydroNode::Join { metadata, .. }
5229 | HydroNode::JoinHalf { metadata, .. }
5230 | HydroNode::Difference { metadata, .. }
5231 | HydroNode::AntiJoin { metadata, .. }
5232 | HydroNode::ResolveFutures { metadata, .. }
5233 | HydroNode::ResolveFuturesBlocking { metadata, .. }
5234 | HydroNode::ResolveFuturesOrdered { metadata, .. }
5235 | HydroNode::Map { metadata, .. }
5236 | HydroNode::FlatMap { metadata, .. }
5237 | HydroNode::FlatMapStreamBlocking { metadata, .. }
5238 | HydroNode::Filter { metadata, .. }
5239 | HydroNode::FilterMap { metadata, .. }
5240 | HydroNode::DeferTick { metadata, .. }
5241 | HydroNode::Enumerate { metadata, .. }
5242 | HydroNode::Inspect { metadata, .. }
5243 | HydroNode::Unique { metadata, .. }
5244 | HydroNode::Sort { metadata, .. }
5245 | HydroNode::Scan { metadata, .. }
5246 | HydroNode::ScanAsyncBlocking { metadata, .. }
5247 | HydroNode::Fold { metadata, .. }
5248 | HydroNode::FoldKeyed { metadata, .. }
5249 | HydroNode::Reduce { metadata, .. }
5250 | HydroNode::ReduceKeyed { metadata, .. }
5251 | HydroNode::ReduceKeyedWatermark { metadata, .. }
5252 | HydroNode::ExternalInput { metadata, .. }
5253 | HydroNode::Network { metadata, .. }
5254 | HydroNode::Counter { metadata, .. } => metadata,
5255 }
5256 }
5257
5258 pub fn input(&self) -> Vec<&HydroNode> {
5259 match self {
5260 HydroNode::Placeholder => {
5261 panic!()
5262 }
5263 HydroNode::Source { .. }
5264 | HydroNode::SingletonSource { .. }
5265 | HydroNode::ExternalInput { .. }
5266 | HydroNode::CycleSource { .. }
5267 | HydroNode::Tee { .. }
5268 | HydroNode::Reference { .. }
5269 | HydroNode::Partition { .. } => {
5270 vec![]
5272 }
5273 HydroNode::Cast { inner, .. }
5274 | HydroNode::ObserveNonDet { inner, .. }
5275 | HydroNode::YieldConcat { inner, .. }
5276 | HydroNode::BeginAtomic { inner, .. }
5277 | HydroNode::EndAtomic { inner, .. }
5278 | HydroNode::Batch { inner, .. }
5279 | HydroNode::UnboundSingleton { inner, .. }
5280 | HydroNode::AssertIsConsistent { inner, .. } => {
5281 vec![inner]
5282 }
5283 HydroNode::Chain { first, second, .. } => {
5284 vec![first, second]
5285 }
5286 HydroNode::MergeOrdered { first, second, .. } => {
5287 vec![first, second]
5288 }
5289 HydroNode::ChainFirst { first, second, .. } => {
5290 vec![first, second]
5291 }
5292 HydroNode::CrossProduct { left, right, .. }
5293 | HydroNode::CrossSingleton { left, right, .. }
5294 | HydroNode::Join { left, right, .. }
5295 | HydroNode::JoinHalf { left, right, .. } => {
5296 vec![left, right]
5297 }
5298 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
5299 vec![pos, neg]
5300 }
5301 HydroNode::Map { input, .. }
5302 | HydroNode::FlatMap { input, .. }
5303 | HydroNode::FlatMapStreamBlocking { input, .. }
5304 | HydroNode::Filter { input, .. }
5305 | HydroNode::FilterMap { input, .. }
5306 | HydroNode::Sort { input, .. }
5307 | HydroNode::DeferTick { input, .. }
5308 | HydroNode::Enumerate { input, .. }
5309 | HydroNode::Inspect { input, .. }
5310 | HydroNode::Unique { input, .. }
5311 | HydroNode::Network { input, .. }
5312 | HydroNode::Counter { input, .. }
5313 | HydroNode::ResolveFutures { input, .. }
5314 | HydroNode::ResolveFuturesBlocking { input, .. }
5315 | HydroNode::ResolveFuturesOrdered { input, .. }
5316 | HydroNode::Fold { input, .. }
5317 | HydroNode::FoldKeyed { input, .. }
5318 | HydroNode::Reduce { input, .. }
5319 | HydroNode::ReduceKeyed { input, .. }
5320 | HydroNode::Scan { input, .. }
5321 | HydroNode::ScanAsyncBlocking { input, .. } => {
5322 vec![input]
5323 }
5324 HydroNode::ReduceKeyedWatermark {
5325 input, watermark, ..
5326 } => {
5327 vec![input, watermark]
5328 }
5329 }
5330 }
5331
5332 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
5333 self.input()
5334 .iter()
5335 .map(|input_node| input_node.metadata())
5336 .collect()
5337 }
5338
5339 pub fn is_shared_with_others(&self) -> bool {
5343 match self {
5344 HydroNode::Tee { inner, .. } | HydroNode::Partition { inner, .. } => {
5345 Rc::strong_count(&inner.0) > 1
5346 }
5347 HydroNode::Reference { .. } => false,
5350 _ => false,
5351 }
5352 }
5353
5354 pub fn print_root(&self) -> String {
5355 match self {
5356 HydroNode::Placeholder => {
5357 panic!()
5358 }
5359 HydroNode::Cast { .. } => "Cast()".to_owned(),
5360 HydroNode::UnboundSingleton { .. } => "UnboundSingleton()".to_owned(),
5361 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
5362 HydroNode::AssertIsConsistent { .. } => "AssertIsConsistent()".to_owned(),
5363 HydroNode::Source { source, .. } => format!("Source({:?})", source),
5364 HydroNode::SingletonSource {
5365 value,
5366 first_tick_only,
5367 ..
5368 } => format!(
5369 "SingletonSource({:?}, first_tick_only={})",
5370 value, first_tick_only
5371 ),
5372 HydroNode::CycleSource { cycle_id, .. } => format!("CycleSource({})", cycle_id),
5373 HydroNode::Tee { inner, .. } => {
5374 format!("Tee({})", inner.0.borrow().print_root())
5375 }
5376 HydroNode::Reference { inner, kind, .. } => {
5377 format!("Reference({:?}, {})", kind, inner.0.borrow().print_root())
5378 }
5379 HydroNode::Partition { f, is_true, .. } => {
5380 format!("Partition({:?}, is_true={})", f, is_true)
5381 }
5382 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
5383 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
5384 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
5385 HydroNode::Batch { .. } => "Batch()".to_owned(),
5386 HydroNode::Chain { first, second, .. } => {
5387 format!("Chain({}, {})", first.print_root(), second.print_root())
5388 }
5389 HydroNode::MergeOrdered { first, second, .. } => {
5390 format!(
5391 "MergeOrdered({}, {})",
5392 first.print_root(),
5393 second.print_root()
5394 )
5395 }
5396 HydroNode::ChainFirst { first, second, .. } => {
5397 format!(
5398 "ChainFirst({}, {})",
5399 first.print_root(),
5400 second.print_root()
5401 )
5402 }
5403 HydroNode::CrossProduct { left, right, .. } => {
5404 format!(
5405 "CrossProduct({}, {})",
5406 left.print_root(),
5407 right.print_root()
5408 )
5409 }
5410 HydroNode::CrossSingleton { left, right, .. } => {
5411 format!(
5412 "CrossSingleton({}, {})",
5413 left.print_root(),
5414 right.print_root()
5415 )
5416 }
5417 HydroNode::Join { left, right, .. } => {
5418 format!("Join({}, {})", left.print_root(), right.print_root())
5419 }
5420 HydroNode::JoinHalf { left, right, .. } => {
5421 format!("JoinHalf({}, {})", left.print_root(), right.print_root())
5422 }
5423 HydroNode::Difference { pos, neg, .. } => {
5424 format!("Difference({}, {})", pos.print_root(), neg.print_root())
5425 }
5426 HydroNode::AntiJoin { pos, neg, .. } => {
5427 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
5428 }
5429 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
5430 HydroNode::ResolveFuturesBlocking { .. } => "ResolveFuturesBlocking()".to_owned(),
5431 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
5432 HydroNode::Map { f, .. } => format!("Map({:?})", f),
5433 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
5434 HydroNode::FlatMapStreamBlocking { f, .. } => format!("FlatMapStreamBlocking({:?})", f),
5435 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
5436 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
5437 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
5438 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
5439 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
5440 HydroNode::Unique { .. } => "Unique()".to_owned(),
5441 HydroNode::Sort { .. } => "Sort()".to_owned(),
5442 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
5443 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
5444 HydroNode::ScanAsyncBlocking { init, acc, .. } => {
5445 format!("ScanAsyncBlocking({:?}, {:?})", init, acc)
5446 }
5447 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
5448 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
5449 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
5450 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
5451 HydroNode::Network { .. } => "Network()".to_owned(),
5452 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
5453 HydroNode::Counter { tag, duration, .. } => {
5454 format!("Counter({:?}, {:?})", tag, duration)
5455 }
5456 }
5457 }
5458}
5459
5460#[cfg(feature = "build")]
5461fn instantiate_network<'a, D>(
5462 env: &mut D::InstantiateEnv,
5463 from_location: &LocationId,
5464 to_location: &LocationId,
5465 processes: &SparseSecondaryMap<LocationKey, D::Process>,
5466 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
5467 name: Option<&str>,
5468 networking_info: &crate::networking::NetworkingInfo,
5469) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
5470where
5471 D: Deploy<'a>,
5472{
5473 let ((sink, source), connect_fn) = match (from_location, to_location) {
5474 (&LocationId::Process(from), &LocationId::Process(to)) => {
5475 let from_node = processes
5476 .get(from)
5477 .unwrap_or_else(|| {
5478 panic!("A process used in the graph was not instantiated: {}", from)
5479 })
5480 .clone();
5481 let to_node = processes
5482 .get(to)
5483 .unwrap_or_else(|| {
5484 panic!("A process used in the graph was not instantiated: {}", to)
5485 })
5486 .clone();
5487
5488 let sink_port = from_node.next_port();
5489 let source_port = to_node.next_port();
5490
5491 (
5492 D::o2o_sink_source(
5493 env,
5494 &from_node,
5495 &sink_port,
5496 &to_node,
5497 &source_port,
5498 name,
5499 networking_info,
5500 ),
5501 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
5502 )
5503 }
5504 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
5505 let from_node = processes
5506 .get(from)
5507 .unwrap_or_else(|| {
5508 panic!("A process used in the graph was not instantiated: {}", from)
5509 })
5510 .clone();
5511 let to_node = clusters
5512 .get(to)
5513 .unwrap_or_else(|| {
5514 panic!("A cluster used in the graph was not instantiated: {}", to)
5515 })
5516 .clone();
5517
5518 let sink_port = from_node.next_port();
5519 let source_port = to_node.next_port();
5520
5521 (
5522 D::o2m_sink_source(
5523 env,
5524 &from_node,
5525 &sink_port,
5526 &to_node,
5527 &source_port,
5528 name,
5529 networking_info,
5530 ),
5531 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
5532 )
5533 }
5534 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
5535 let from_node = clusters
5536 .get(from)
5537 .unwrap_or_else(|| {
5538 panic!("A cluster used in the graph was not instantiated: {}", from)
5539 })
5540 .clone();
5541 let to_node = processes
5542 .get(to)
5543 .unwrap_or_else(|| {
5544 panic!("A process used in the graph was not instantiated: {}", to)
5545 })
5546 .clone();
5547
5548 let sink_port = from_node.next_port();
5549 let source_port = to_node.next_port();
5550
5551 (
5552 D::m2o_sink_source(
5553 env,
5554 &from_node,
5555 &sink_port,
5556 &to_node,
5557 &source_port,
5558 name,
5559 networking_info,
5560 ),
5561 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
5562 )
5563 }
5564 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
5565 let from_node = clusters
5566 .get(from)
5567 .unwrap_or_else(|| {
5568 panic!("A cluster used in the graph was not instantiated: {}", from)
5569 })
5570 .clone();
5571 let to_node = clusters
5572 .get(to)
5573 .unwrap_or_else(|| {
5574 panic!("A cluster used in the graph was not instantiated: {}", to)
5575 })
5576 .clone();
5577
5578 let sink_port = from_node.next_port();
5579 let source_port = to_node.next_port();
5580
5581 (
5582 D::m2m_sink_source(
5583 env,
5584 &from_node,
5585 &sink_port,
5586 &to_node,
5587 &source_port,
5588 name,
5589 networking_info,
5590 ),
5591 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
5592 )
5593 }
5594 (LocationId::Tick(_, _), _) => panic!(),
5595 (_, LocationId::Tick(_, _)) => panic!(),
5596 (LocationId::Atomic(_), _) => panic!(),
5597 (_, LocationId::Atomic(_)) => panic!(),
5598 };
5599 (sink, source, connect_fn)
5600}
5601
5602#[cfg(test)]
5603mod serde_test;
5604
5605#[cfg(test)]
5606mod test {
5607 use std::mem::size_of;
5608
5609 use stageleft::{QuotedWithContext, q};
5610
5611 use super::*;
5612
5613 #[test]
5614 #[cfg_attr(
5615 not(feature = "build"),
5616 ignore = "expects inclusion of feature-gated fields"
5617 )]
5618 fn hydro_node_size() {
5619 assert_eq!(size_of::<HydroNode>(), 264);
5620 }
5621
5622 #[test]
5623 #[cfg_attr(
5624 not(feature = "build"),
5625 ignore = "expects inclusion of feature-gated fields"
5626 )]
5627 fn hydro_root_size() {
5628 assert_eq!(size_of::<HydroRoot>(), 136);
5629 }
5630
5631 #[test]
5632 fn test_simplify_q_macro_basic() {
5633 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
5635 let result = simplify_q_macro(simple_expr.clone());
5636 assert_eq!(result, simple_expr);
5637 }
5638
5639 #[test]
5640 fn test_simplify_q_macro_actual_stageleft_call() {
5641 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
5643 let result = simplify_q_macro(stageleft_call);
5644 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5647 }
5648
5649 #[test]
5650 fn test_closure_no_pipe_at_start() {
5651 let stageleft_call = q!({
5653 let foo = 123;
5654 move |b: usize| b + foo
5655 })
5656 .splice_fn1_ctx(&());
5657 let result = simplify_q_macro(stageleft_call);
5658 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
5659 }
5660}