Skip to main content

hydro_lang/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![cfg_attr(not(stageleft_trybuild), warn(missing_docs))]
3
4//! Hydro is a high-level distributed programming framework for Rust.
5//! Hydro can help you quickly write scalable distributed services that are correct by construction.
6//! Much like Rust helps with memory safety, Hydro helps with [distributed safety](https://hydro.run/docs/hydro/reference/correctness).
7//!
8//! The core Hydro API involves [live collections](https://hydro.run/docs/hydro/reference/live-collections/), which represent asynchronously
9//! updated sources of data such as incoming network requests and application state. The most common live collection is
10//! [`live_collections::stream::Stream`]; other live collections can be found in [`live_collections`].
11//!
12//! Hydro uses a unique compilation approach where you define deployment logic as Rust code alongside your distributed system implementation.
13//! For more details on this API, see the [Hydro docs](https://hydro.run/docs/hydro/reference/deploy/) and the [`deploy`] module.
14
15stageleft::stageleft_no_entry_crate!();
16
17#[cfg(feature = "runtime_support")]
18#[cfg_attr(docsrs, doc(cfg(feature = "runtime_support")))]
19#[doc(hidden)]
20pub mod runtime_support {
21    pub use ::{bincode, dfir_rs, slotmap, stageleft};
22    #[cfg(feature = "sim")]
23    pub use colored;
24    #[cfg(feature = "deploy_integration")]
25    pub use hydro_deploy_integration;
26    #[cfg(feature = "tokio")]
27    pub use tokio;
28
29    #[cfg(feature = "deploy_integration")]
30    pub mod launch;
31}
32
33#[doc(hidden)]
34pub mod macro_support {
35    pub use copy_span;
36}
37
38pub mod prelude {
39    // taken from `tokio`
40    //! A "prelude" for users of the `hydro_lang` crate.
41    //!
42    //! This prelude is similar to the standard library's prelude in that you'll almost always want to import its entire contents, but unlike the standard library's prelude you'll have to do so manually:
43    //! ```
44    //! # #![allow(warnings)]
45    //! use hydro_lang::prelude::*;
46    //! ```
47    //!
48    //! The prelude may grow over time as additional items see ubiquitous use.
49
50    pub use stageleft::q;
51
52    pub use crate::compile::builder::FlowBuilder;
53    pub use crate::live_collections::boundedness::{Bounded, Unbounded};
54    pub use crate::live_collections::keyed_singleton::{KeyedSingleton, MonotonicKeys};
55    pub use crate::live_collections::keyed_stream::KeyedStream;
56    pub use crate::live_collections::optional::Optional;
57    pub use crate::live_collections::singleton::Singleton;
58    pub use crate::live_collections::sliced::sliced;
59    pub use crate::live_collections::stream::Stream;
60    pub use crate::location::{Cluster, External, Location as _, Process, Tick};
61    pub use crate::networking::TCP;
62    pub use crate::nondet::{NonDet, nondet};
63    pub use crate::properties::{ConsistencyProof, ManualProof, manual_proof};
64
65    /// A macro to set up a Hydro crate.
66    #[macro_export]
67    macro_rules! setup {
68        () => {
69            stageleft::stageleft_no_entry_crate!();
70
71            #[cfg(test)]
72            mod test_init {
73                #[ctor::ctor]
74                fn init() {
75                    $crate::compile::init_test();
76                }
77            }
78        };
79    }
80}
81
82#[cfg(feature = "dfir_context")]
83#[cfg_attr(docsrs, doc(cfg(feature = "dfir_context")))]
84pub mod runtime_context;
85
86pub mod nondet;
87
88pub mod live_collections;
89
90pub mod location;
91
92pub mod networking;
93
94pub mod properties;
95
96pub mod telemetry;
97
98#[cfg(any(
99    feature = "deploy",
100    feature = "sim",
101    feature = "deploy_integration" // hidden internal feature enabled in the trybuild
102))]
103#[cfg_attr(docsrs, doc(cfg(any(feature = "deploy", feature = "sim"))))]
104pub mod deploy;
105
106#[cfg(feature = "sim")]
107#[cfg_attr(docsrs, doc(cfg(feature = "sim")))]
108pub mod sim;
109
110pub mod forward_handle;
111
112pub mod compile;
113
114pub mod handoff_ref;
115
116mod manual_expr;
117
118#[cfg(stageleft_runtime)]
119#[cfg(feature = "viz")]
120#[cfg_attr(docsrs, doc(cfg(feature = "viz")))]
121#[expect(missing_docs, reason = "TODO")]
122pub mod viz;
123
124#[cfg_attr(
125    feature = "stageleft_macro_entrypoint",
126    expect(missing_docs, reason = "staging internals")
127)]
128mod staging_util;
129
130#[cfg(feature = "deploy")]
131#[cfg_attr(docsrs, doc(cfg(feature = "deploy")))]
132pub mod test_util;
133
134#[cfg(feature = "build")]
135#[ctor::ctor]
136fn init_rewrites() {
137    stageleft::add_private_reexport(
138        vec!["tokio_util", "codec", "lines_codec"],
139        vec!["tokio_util", "codec"],
140    );
141    // TODO: remove once stageleft is updated with this rewrite built-in
142    stageleft::add_private_reexport(
143        vec!["core", "iter", "sources", "empty"],
144        vec!["std", "iter"],
145    );
146}
147
148#[cfg(all(test, feature = "trybuild"))]
149mod test_init {
150    #[ctor::ctor]
151    fn init() {
152        crate::compile::init_test();
153    }
154}
155
156/// Creates a newtype wrapper around an integer type.
157///
158/// Usage:
159/// ```rust,ignore
160/// hydro_lang::newtype_counter! {
161///     /// My counter.
162///     pub struct MyCounter(u32);
163///
164///     /// My secret counter.
165///     struct SecretCounter(u64);
166/// }
167/// ```
168#[doc(hidden)]
169#[macro_export]
170macro_rules! newtype_counter {
171    (
172        $(
173            $( #[$attr:meta] )*
174            $vis:vis struct $name:ident($typ:ty);
175        )*
176    ) => {
177        $(
178            $( #[$attr] )*
179            #[repr(transparent)]
180            #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
181            $vis struct $name($typ);
182
183            #[allow(clippy::allow_attributes, dead_code, reason = "macro-generated methods may be unused")]
184            impl $name {
185                /// Reveals the inner ID.
186                pub fn into_inner(self) -> $typ {
187                    self.0
188                }
189            }
190
191            impl std::fmt::Display for $name {
192                fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193                    write!(f, "{}", self.0)
194                }
195            }
196
197            impl serde::ser::Serialize for $name {
198                fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
199                where
200                    S: serde::Serializer
201                {
202                    serde::ser::Serialize::serialize(&self.0, serializer)
203                }
204            }
205
206            impl<'de> serde::de::Deserialize<'de> for $name {
207                fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
208                where
209                    D: serde::Deserializer<'de>
210                {
211                    serde::de::Deserialize::deserialize(deserializer).map(Self)
212                }
213            }
214
215            #[sealed::sealed]
216            impl $crate::Countable for $name {
217                fn from_count(val: usize) -> Self {
218                    Self(val as $typ)
219                }
220            }
221        )*
222    };
223}
224
225/// Sealed trait implemented by ID types produced via [`newtype_counter!`].
226///
227/// This allows [`Counter<T>`] to mint new IDs without exposing a public
228/// constructor on the ID types themselves.
229#[doc(hidden)]
230#[sealed::sealed]
231pub trait Countable {
232    #[doc(hidden)]
233    fn from_count(val: usize) -> Self;
234}
235
236/// An opaque counter that produces unique IDs of type `T` via [`Counter::get_and_increment`].
237///
238/// This is separate from the ID types themselves so that holding an ID does not
239/// give the ability to mint new IDs.
240#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
241pub struct Counter<T: Countable>(usize, std::marker::PhantomData<T>);
242
243impl<T: Countable> Default for Counter<T> {
244    fn default() -> Self {
245        Self(0, std::marker::PhantomData)
246    }
247}
248
249impl<T: Countable> Counter<T> {
250    /// Gets the current counter value and increments for the next call.
251    pub fn get_and_increment(&mut self) -> T {
252        let id = self.0;
253        self.0 += 1;
254        T::from_count(id)
255    }
256
257    /// Returns an iterator from zero up to (but excluding) the current counter value.
258    ///
259    /// This is useful for iterating already-allocated values.
260    pub fn range_up_to(&self) -> impl DoubleEndedIterator<Item = T> + std::iter::FusedIterator {
261        (0..self.0).map(T::from_count)
262    }
263}