Skip to main content

hydro_lang/live_collections/
keyed_singleton.rs

1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::CycleId;
18use crate::compile::ir::{
19    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40    /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41    type UnderlyingBound: Boundedness;
42    /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43    type ValueBound: Boundedness;
44
45    /// The type of the keyed singleton if the value for each key is immutable.
46    type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
47
48    /// The type of the keyed singleton if the value for each key may change asynchronously.
49    type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
50
51    /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
52    fn bound_kind() -> KeyedSingletonBoundKind;
53}
54
55impl KeyedSingletonBound for Unbounded {
56    type UnderlyingBound = Unbounded;
57    type ValueBound = Unbounded;
58    type WithBoundedValue = BoundedValue;
59    type WithUnboundedValue = Unbounded;
60
61    fn bound_kind() -> KeyedSingletonBoundKind {
62        KeyedSingletonBoundKind::Unbounded
63    }
64}
65
66impl KeyedSingletonBound for Bounded {
67    type UnderlyingBound = Bounded;
68    type ValueBound = Bounded;
69    type WithBoundedValue = Bounded;
70    type WithUnboundedValue = UnreachableBound;
71
72    fn bound_kind() -> KeyedSingletonBoundKind {
73        KeyedSingletonBoundKind::Bounded
74    }
75}
76
77/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
78/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
79/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
80pub struct BoundedValue;
81
82impl KeyedSingletonBound for BoundedValue {
83    type UnderlyingBound = Unbounded;
84    type ValueBound = Bounded;
85    type WithBoundedValue = BoundedValue;
86    type WithUnboundedValue = Unbounded;
87
88    fn bound_kind() -> KeyedSingletonBoundKind {
89        KeyedSingletonBoundKind::BoundedValue
90    }
91}
92
93#[doc(hidden)]
94pub struct UnreachableBound;
95
96impl KeyedSingletonBound for UnreachableBound {
97    type UnderlyingBound = Bounded;
98    type ValueBound = Unbounded;
99
100    type WithBoundedValue = Bounded;
101    type WithUnboundedValue = UnreachableBound;
102
103    fn bound_kind() -> KeyedSingletonBoundKind {
104        unreachable!("UnreachableBound cannot be instantiated")
105    }
106}
107
108/// Mapping from keys of type `K` to values of type `V`.
109///
110/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
111/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
112/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
113/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
114/// keys cannot be removed and the value for each key is immutable.
115///
116/// Type Parameters:
117/// - `K`: the type of the key for each entry
118/// - `V`: the type of the value for each entry
119/// - `Loc`: the [`Location`] where the keyed singleton is materialized
120/// - `Bound`: tracks whether the entries are:
121///     - [`Bounded`] (local and finite)
122///     - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
123///     - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
124pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
125    pub(crate) location: Loc,
126    pub(crate) ir_node: RefCell<HydroNode>,
127
128    _phantom: PhantomData<(K, V, Loc, Bound)>,
129}
130
131impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
132    for KeyedSingleton<K, V, Loc, Bound>
133{
134    fn clone(&self) -> Self {
135        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
136            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
137            *self.ir_node.borrow_mut() = HydroNode::Tee {
138                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
139                metadata: self.location.new_node_metadata(Self::collection_kind()),
140            };
141        }
142
143        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
144            KeyedSingleton {
145                location: self.location.clone(),
146                ir_node: HydroNode::Tee {
147                    inner: TeeNode(inner.0.clone()),
148                    metadata: metadata.clone(),
149                }
150                .into(),
151                _phantom: PhantomData,
152            }
153        } else {
154            unreachable!()
155        }
156    }
157}
158
159impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
160    for KeyedSingleton<K, V, L, B>
161where
162    L: Location<'a> + NoTick,
163{
164    type Location = L;
165
166    fn create_source(cycle_id: CycleId, location: L) -> Self {
167        KeyedSingleton {
168            location: location.clone(),
169            ir_node: RefCell::new(HydroNode::CycleSource {
170                cycle_id,
171                metadata: location.new_node_metadata(Self::collection_kind()),
172            }),
173            _phantom: PhantomData,
174        }
175    }
176}
177
178impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
179where
180    L: Location<'a>,
181{
182    type Location = Tick<L>;
183
184    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
185        KeyedSingleton::new(
186            location.clone(),
187            HydroNode::CycleSource {
188                cycle_id,
189                metadata: location.new_node_metadata(Self::collection_kind()),
190            },
191        )
192    }
193}
194
195impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
196where
197    L: Location<'a>,
198{
199    fn defer_tick(self) -> Self {
200        KeyedSingleton::defer_tick(self)
201    }
202}
203
204impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
205    for KeyedSingleton<K, V, L, B>
206where
207    L: Location<'a> + NoTick,
208{
209    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
210        assert_eq!(
211            Location::id(&self.location),
212            expected_location,
213            "locations do not match"
214        );
215        self.location
216            .flow_state()
217            .borrow_mut()
218            .push_root(HydroRoot::CycleSink {
219                cycle_id,
220                input: Box::new(self.ir_node.into_inner()),
221                op_metadata: HydroIrOpMetadata::new(),
222            });
223    }
224}
225
226impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
227where
228    L: Location<'a>,
229{
230    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
231        assert_eq!(
232            Location::id(&self.location),
233            expected_location,
234            "locations do not match"
235        );
236        self.location
237            .flow_state()
238            .borrow_mut()
239            .push_root(HydroRoot::CycleSink {
240                cycle_id,
241                input: Box::new(self.ir_node.into_inner()),
242                op_metadata: HydroIrOpMetadata::new(),
243            });
244    }
245}
246
247impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
248    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
249        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
250        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
251
252        KeyedSingleton {
253            location,
254            ir_node: RefCell::new(ir_node),
255            _phantom: PhantomData,
256        }
257    }
258
259    /// Returns the [`Location`] where this keyed singleton is being materialized.
260    pub fn location(&self) -> &L {
261        &self.location
262    }
263}
264
265#[cfg(stageleft_runtime)]
266fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
267    me: KeyedSingleton<K, V, L, Bounded>,
268) -> Singleton<usize, L, Bounded> {
269    me.entries().count()
270}
271
272#[cfg(stageleft_runtime)]
273fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
274    me: KeyedSingleton<K, V, L, Bounded>,
275) -> Singleton<HashMap<K, V>, L, Bounded>
276where
277    K: Eq + Hash,
278{
279    me.entries()
280        .assume_ordering(nondet!(
281            /// Because this is a keyed singleton, there is only one value per key.
282        ))
283        .fold(
284            q!(|| HashMap::new()),
285            q!(|map, (k, v)| {
286                map.insert(k, v);
287            }),
288        )
289}
290
291impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
292    pub(crate) fn collection_kind() -> CollectionKind {
293        CollectionKind::KeyedSingleton {
294            bound: B::bound_kind(),
295            key_type: stageleft::quote_type::<K>().into(),
296            value_type: stageleft::quote_type::<V>().into(),
297        }
298    }
299
300    /// Transforms each value by invoking `f` on each element, with keys staying the same
301    /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
302    ///
303    /// If you do not want to modify the stream and instead only want to view
304    /// each item use [`KeyedSingleton::inspect`] instead.
305    ///
306    /// # Example
307    /// ```rust
308    /// # #[cfg(feature = "deploy")] {
309    /// # use hydro_lang::prelude::*;
310    /// # use futures::StreamExt;
311    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
312    /// let keyed_singleton = // { 1: 2, 2: 4 }
313    /// # process
314    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
315    /// #     .into_keyed()
316    /// #     .first();
317    /// keyed_singleton.map(q!(|v| v + 1))
318    /// #   .entries()
319    /// # }, |mut stream| async move {
320    /// // { 1: 3, 2: 5 }
321    /// # let mut results = Vec::new();
322    /// # for _ in 0..2 {
323    /// #     results.push(stream.next().await.unwrap());
324    /// # }
325    /// # results.sort();
326    /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
327    /// # }));
328    /// # }
329    /// ```
330    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
331    where
332        F: Fn(V) -> U + 'a,
333    {
334        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
335        let map_f = q!({
336            let orig = f;
337            move |(k, v)| (k, orig(v))
338        })
339        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
340        .into();
341
342        KeyedSingleton::new(
343            self.location.clone(),
344            HydroNode::Map {
345                f: map_f,
346                input: Box::new(self.ir_node.into_inner()),
347                metadata: self
348                    .location
349                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
350            },
351        )
352    }
353
354    /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
355    /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
356    ///
357    /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
358    /// the new value `U`. The key remains unchanged in the output.
359    ///
360    /// # Example
361    /// ```rust
362    /// # #[cfg(feature = "deploy")] {
363    /// # use hydro_lang::prelude::*;
364    /// # use futures::StreamExt;
365    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
366    /// let keyed_singleton = // { 1: 2, 2: 4 }
367    /// # process
368    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
369    /// #     .into_keyed()
370    /// #     .first();
371    /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
372    /// #   .entries()
373    /// # }, |mut stream| async move {
374    /// // { 1: 3, 2: 6 }
375    /// # let mut results = Vec::new();
376    /// # for _ in 0..2 {
377    /// #     results.push(stream.next().await.unwrap());
378    /// # }
379    /// # results.sort();
380    /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
381    /// # }));
382    /// # }
383    /// ```
384    pub fn map_with_key<U, F>(
385        self,
386        f: impl IntoQuotedMut<'a, F, L> + Copy,
387    ) -> KeyedSingleton<K, U, L, B>
388    where
389        F: Fn((K, V)) -> U + 'a,
390        K: Clone,
391    {
392        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
393        let map_f = q!({
394            let orig = f;
395            move |(k, v)| {
396                let out = orig((Clone::clone(&k), v));
397                (k, out)
398            }
399        })
400        .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
401        .into();
402
403        KeyedSingleton::new(
404            self.location.clone(),
405            HydroNode::Map {
406                f: map_f,
407                input: Box::new(self.ir_node.into_inner()),
408                metadata: self
409                    .location
410                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
411            },
412        )
413    }
414
415    /// Gets the number of keys in the keyed singleton.
416    ///
417    /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
418    /// since keys may be added / removed over time. When the set of keys changes, the count will
419    /// be asynchronously updated.
420    ///
421    /// # Example
422    /// ```rust
423    /// # #[cfg(feature = "deploy")] {
424    /// # use hydro_lang::prelude::*;
425    /// # use futures::StreamExt;
426    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
427    /// # let tick = process.tick();
428    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
429    /// # process
430    /// #     .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
431    /// #     .into_keyed()
432    /// #     .batch(&tick, nondet!(/** test */))
433    /// #     .first();
434    /// keyed_singleton.key_count()
435    /// # .all_ticks()
436    /// # }, |mut stream| async move {
437    /// // 3
438    /// # assert_eq!(stream.next().await.unwrap(), 3);
439    /// # }));
440    /// # }
441    /// ```
442    pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
443        if B::ValueBound::BOUNDED {
444            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
445                location: self.location,
446                ir_node: self.ir_node,
447                _phantom: PhantomData,
448            };
449
450            me.entries().count()
451        } else if L::is_top_level()
452            && let Some(tick) = self.location.try_tick()
453        {
454            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
455                location: self.location,
456                ir_node: self.ir_node,
457                _phantom: PhantomData,
458            };
459
460            let out =
461                key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
462                    .latest();
463            Singleton::new(out.location, out.ir_node.into_inner())
464        } else {
465            panic!("Unbounded KeyedSingleton inside a tick");
466        }
467    }
468
469    /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
470    ///
471    /// As the values for each key are updated asynchronously, the `HashMap` will be updated
472    /// asynchronously as well.
473    ///
474    /// # Example
475    /// ```rust
476    /// # #[cfg(feature = "deploy")] {
477    /// # use hydro_lang::prelude::*;
478    /// # use futures::StreamExt;
479    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
480    /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
481    /// # process
482    /// #     .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
483    /// #     .into_keyed()
484    /// #     .batch(&process.tick(), nondet!(/** test */))
485    /// #     .first();
486    /// keyed_singleton.into_singleton()
487    /// # .all_ticks()
488    /// # }, |mut stream| async move {
489    /// // { 1: "a", 2: "b", 3: "c" }
490    /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
491    /// # }));
492    /// # }
493    /// ```
494    pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
495    where
496        K: Eq + Hash,
497    {
498        if B::ValueBound::BOUNDED {
499            let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
500                location: self.location,
501                ir_node: self.ir_node,
502                _phantom: PhantomData,
503            };
504
505            me.entries()
506                .assume_ordering(nondet!(
507                    /// Because this is a keyed singleton, there is only one value per key.
508                ))
509                .fold(
510                    q!(|| HashMap::new()),
511                    q!(|map, (k, v)| {
512                        // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
513                        map.insert(k, v);
514                    }),
515                )
516        } else if L::is_top_level()
517            && let Some(tick) = self.location.try_tick()
518        {
519            let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
520                location: self.location,
521                ir_node: self.ir_node,
522                _phantom: PhantomData,
523            };
524
525            let out = into_singleton_inside_tick(
526                me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
527            )
528            .latest();
529            Singleton::new(out.location, out.ir_node.into_inner())
530        } else {
531            panic!("Unbounded KeyedSingleton inside a tick");
532        }
533    }
534
535    /// An operator which allows you to "name" a `HydroNode`.
536    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
537    pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
538        {
539            let mut node = self.ir_node.borrow_mut();
540            let metadata = node.metadata_mut();
541            metadata.tag = Some(name.to_owned());
542        }
543        self
544    }
545
546    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
547    /// implies that `B == Bounded`.
548    pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
549    where
550        B: IsBounded,
551    {
552        KeyedSingleton::new(self.location, self.ir_node.into_inner())
553    }
554
555    /// Gets the value associated with a specific key from the keyed singleton.
556    /// Returns `None` if the key is `None` or there is no associated value.
557    ///
558    /// # Example
559    /// ```rust
560    /// # #[cfg(feature = "deploy")] {
561    /// # use hydro_lang::prelude::*;
562    /// # use futures::StreamExt;
563    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
564    /// let tick = process.tick();
565    /// let keyed_data = process
566    ///     .source_iter(q!(vec![(1, 2), (2, 3)]))
567    ///     .into_keyed()
568    ///     .batch(&tick, nondet!(/** test */))
569    ///     .first();
570    /// let key = tick.singleton(q!(1));
571    /// keyed_data.get(key).all_ticks()
572    /// # }, |mut stream| async move {
573    /// // 2
574    /// # assert_eq!(stream.next().await.unwrap(), 2);
575    /// # }));
576    /// # }
577    /// ```
578    pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
579    where
580        B: IsBounded,
581        K: Hash + Eq,
582    {
583        self.make_bounded()
584            .into_keyed_stream()
585            .get(key)
586            .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
587            .first()
588    }
589
590    /// Emit a keyed stream containing keys shared between the keyed singleton and the
591    /// keyed stream, where each value in the output keyed stream is a tuple of
592    /// (the keyed singleton's value, the keyed stream's value).
593    ///
594    /// # Example
595    /// ```rust
596    /// # #[cfg(feature = "deploy")] {
597    /// # use hydro_lang::prelude::*;
598    /// # use futures::StreamExt;
599    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
600    /// let tick = process.tick();
601    /// let keyed_data = process
602    ///     .source_iter(q!(vec![(1, 10), (2, 20)]))
603    ///     .into_keyed()
604    ///     .batch(&tick, nondet!(/** test */))
605    ///     .first();
606    /// let other_data = process
607    ///     .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
608    ///     .into_keyed()
609    ///     .batch(&tick, nondet!(/** test */));
610    /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
611    /// # }, |mut stream| async move {
612    /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
613    /// # let mut results = vec![];
614    /// # for _ in 0..3 {
615    /// #     results.push(stream.next().await.unwrap());
616    /// # }
617    /// # results.sort();
618    /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
619    /// # }));
620    /// # }
621    /// ```
622    pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2>(
623        self,
624        keyed_stream: KeyedStream<K, V2, L, Bounded, O2, R2>,
625    ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R2>
626    where
627        B: IsBounded,
628        K: Eq + Hash,
629    {
630        self.make_bounded()
631            .entries()
632            .weaken_retries::<R2>()
633            .join(keyed_stream.entries())
634            .into_keyed()
635    }
636
637    /// Emit a keyed singleton containing all keys shared between two keyed singletons,
638    /// where each value in the output keyed singleton is a tuple of
639    /// (self.value, other.value).
640    ///
641    /// # Example
642    /// ```rust
643    /// # #[cfg(feature = "deploy")] {
644    /// # use hydro_lang::prelude::*;
645    /// # use futures::StreamExt;
646    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
647    /// # let tick = process.tick();
648    /// let requests = // { 1: 10, 2: 20, 3: 30 }
649    /// # process
650    /// #     .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
651    /// #     .into_keyed()
652    /// #     .batch(&tick, nondet!(/** test */))
653    /// #     .first();
654    /// let other = // { 1: 100, 2: 200, 4: 400 }
655    /// # process
656    /// #     .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
657    /// #     .into_keyed()
658    /// #     .batch(&tick, nondet!(/** test */))
659    /// #     .first();
660    /// requests.join_keyed_singleton(other)
661    /// # .entries().all_ticks()
662    /// # }, |mut stream| async move {
663    /// // { 1: (10, 100), 2: (20, 200) }
664    /// # let mut results = vec![];
665    /// # for _ in 0..2 {
666    /// #     results.push(stream.next().await.unwrap());
667    /// # }
668    /// # results.sort();
669    /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
670    /// # }));
671    /// # }
672    /// ```
673    pub fn join_keyed_singleton<V2: Clone>(
674        self,
675        other: KeyedSingleton<K, V2, L, Bounded>,
676    ) -> KeyedSingleton<K, (V, V2), L, Bounded>
677    where
678        B: IsBounded,
679        K: Eq + Hash,
680    {
681        let result_stream = self
682            .make_bounded()
683            .entries()
684            .join(other.entries())
685            .into_keyed();
686
687        // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
688        KeyedSingleton::new(
689            result_stream.location.clone(),
690            HydroNode::Cast {
691                inner: Box::new(result_stream.ir_node.into_inner()),
692                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
693                    K,
694                    (V, V2),
695                    L,
696                    Bounded,
697                >::collection_kind(
698                )),
699            },
700        )
701    }
702
703    /// For each value in `self`, find the matching key in `lookup`.
704    /// The output is a keyed singleton with the key from `self`, and a value
705    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
706    /// If the key is not present in `lookup`, the option will be [`None`].
707    ///
708    /// # Example
709    /// ```rust
710    /// # #[cfg(feature = "deploy")] {
711    /// # use hydro_lang::prelude::*;
712    /// # use futures::StreamExt;
713    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714    /// # let tick = process.tick();
715    /// let requests = // { 1: 10, 2: 20 }
716    /// # process
717    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
718    /// #     .into_keyed()
719    /// #     .batch(&tick, nondet!(/** test */))
720    /// #     .first();
721    /// let other_data = // { 10: 100, 11: 110 }
722    /// # process
723    /// #     .source_iter(q!(vec![(10, 100), (11, 110)]))
724    /// #     .into_keyed()
725    /// #     .batch(&tick, nondet!(/** test */))
726    /// #     .first();
727    /// requests.lookup_keyed_singleton(other_data)
728    /// # .entries().all_ticks()
729    /// # }, |mut stream| async move {
730    /// // { 1: (10, Some(100)), 2: (20, None) }
731    /// # let mut results = vec![];
732    /// # for _ in 0..2 {
733    /// #     results.push(stream.next().await.unwrap());
734    /// # }
735    /// # results.sort();
736    /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
737    /// # }));
738    /// # }
739    /// ```
740    pub fn lookup_keyed_singleton<V2>(
741        self,
742        lookup: KeyedSingleton<V, V2, L, Bounded>,
743    ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
744    where
745        B: IsBounded,
746        K: Eq + Hash + Clone,
747        V: Eq + Hash + Clone,
748        V2: Clone,
749    {
750        let result_stream = self
751            .make_bounded()
752            .into_keyed_stream()
753            .lookup_keyed_stream(lookup.into_keyed_stream());
754
755        // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
756        KeyedSingleton::new(
757            result_stream.location.clone(),
758            HydroNode::Cast {
759                inner: Box::new(result_stream.ir_node.into_inner()),
760                metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
761                    K,
762                    (V, Option<V2>),
763                    L,
764                    Bounded,
765                >::collection_kind(
766                )),
767            },
768        )
769    }
770
771    /// For each value in `self`, find the matching key in `lookup`.
772    /// The output is a keyed stream with the key from `self`, and a value
773    /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
774    /// If the key is not present in `lookup`, the option will be [`None`].
775    ///
776    /// # Example
777    /// ```rust
778    /// # #[cfg(feature = "deploy")] {
779    /// # use hydro_lang::prelude::*;
780    /// # use futures::StreamExt;
781    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
782    /// # let tick = process.tick();
783    /// let requests = // { 1: 10, 2: 20 }
784    /// # process
785    /// #     .source_iter(q!(vec![(1, 10), (2, 20)]))
786    /// #     .into_keyed()
787    /// #     .batch(&tick, nondet!(/** test */))
788    /// #     .first();
789    /// let other_data = // { 10: 100, 10: 110 }
790    /// # process
791    /// #     .source_iter(q!(vec![(10, 100), (10, 110)]))
792    /// #     .into_keyed()
793    /// #     .batch(&tick, nondet!(/** test */));
794    /// requests.lookup_keyed_stream(other_data)
795    /// # .entries().all_ticks()
796    /// # }, |mut stream| async move {
797    /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
798    /// # let mut results = vec![];
799    /// # for _ in 0..3 {
800    /// #     results.push(stream.next().await.unwrap());
801    /// # }
802    /// # results.sort();
803    /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
804    /// # }));
805    /// # }
806    /// ```
807    pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
808        self,
809        lookup: KeyedStream<V, V2, L, Bounded, O, R>,
810    ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
811    where
812        B: IsBounded,
813        K: Eq + Hash + Clone,
814        V: Eq + Hash + Clone,
815        V2: Clone,
816    {
817        self.make_bounded()
818            .entries()
819            .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
820            .into_keyed()
821            .lookup_keyed_stream(lookup)
822    }
823}
824
825impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
826    KeyedSingleton<K, V, L, B>
827{
828    /// Flattens the keyed singleton into an unordered stream of key-value pairs.
829    ///
830    /// The value for each key must be bounded, otherwise the resulting stream elements would be
831    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
832    /// into the output.
833    ///
834    /// # Example
835    /// ```rust
836    /// # #[cfg(feature = "deploy")] {
837    /// # use hydro_lang::prelude::*;
838    /// # use futures::StreamExt;
839    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
840    /// let keyed_singleton = // { 1: 2, 2: 4 }
841    /// # process
842    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
843    /// #     .into_keyed()
844    /// #     .first();
845    /// keyed_singleton.entries()
846    /// # }, |mut stream| async move {
847    /// // (1, 2), (2, 4) in any order
848    /// # let mut results = Vec::new();
849    /// # for _ in 0..2 {
850    /// #     results.push(stream.next().await.unwrap());
851    /// # }
852    /// # results.sort();
853    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
854    /// # }));
855    /// # }
856    /// ```
857    pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
858        self.into_keyed_stream().entries()
859    }
860
861    /// Flattens the keyed singleton into an unordered stream of just the values.
862    ///
863    /// The value for each key must be bounded, otherwise the resulting stream elements would be
864    /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
865    /// into the output.
866    ///
867    /// # Example
868    /// ```rust
869    /// # #[cfg(feature = "deploy")] {
870    /// # use hydro_lang::prelude::*;
871    /// # use futures::StreamExt;
872    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
873    /// let keyed_singleton = // { 1: 2, 2: 4 }
874    /// # process
875    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
876    /// #     .into_keyed()
877    /// #     .first();
878    /// keyed_singleton.values()
879    /// # }, |mut stream| async move {
880    /// // 2, 4 in any order
881    /// # let mut results = Vec::new();
882    /// # for _ in 0..2 {
883    /// #     results.push(stream.next().await.unwrap());
884    /// # }
885    /// # results.sort();
886    /// # assert_eq!(results, vec![2, 4]);
887    /// # }));
888    /// # }
889    /// ```
890    pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
891        let map_f = q!(|(_, v)| v)
892            .splice_fn1_ctx::<(K, V), V>(&self.location)
893            .into();
894
895        Stream::new(
896            self.location.clone(),
897            HydroNode::Map {
898                f: map_f,
899                input: Box::new(self.ir_node.into_inner()),
900                metadata: self.location.new_node_metadata(Stream::<
901                    V,
902                    L,
903                    B::UnderlyingBound,
904                    NoOrder,
905                    ExactlyOnce,
906                >::collection_kind()),
907            },
908        )
909    }
910
911    /// Flattens the keyed singleton into an unordered stream of just the keys.
912    ///
913    /// The value for each key must be bounded, otherwise the removal of keys would result in
914    /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
915    /// into the output.
916    ///
917    /// # Example
918    /// ```rust
919    /// # #[cfg(feature = "deploy")] {
920    /// # use hydro_lang::prelude::*;
921    /// # use futures::StreamExt;
922    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
923    /// let keyed_singleton = // { 1: 2, 2: 4 }
924    /// # process
925    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
926    /// #     .into_keyed()
927    /// #     .first();
928    /// keyed_singleton.keys()
929    /// # }, |mut stream| async move {
930    /// // 1, 2 in any order
931    /// # let mut results = Vec::new();
932    /// # for _ in 0..2 {
933    /// #     results.push(stream.next().await.unwrap());
934    /// # }
935    /// # results.sort();
936    /// # assert_eq!(results, vec![1, 2]);
937    /// # }));
938    /// # }
939    /// ```
940    pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
941        self.entries().map(q!(|(k, _)| k))
942    }
943
944    /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
945    /// entries whose keys are not in the provided stream.
946    ///
947    /// # Example
948    /// ```rust
949    /// # #[cfg(feature = "deploy")] {
950    /// # use hydro_lang::prelude::*;
951    /// # use futures::StreamExt;
952    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
953    /// let tick = process.tick();
954    /// let keyed_singleton = // { 1: 2, 2: 4 }
955    /// # process
956    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
957    /// #     .into_keyed()
958    /// #     .first()
959    /// #     .batch(&tick, nondet!(/** test */));
960    /// let keys_to_remove = process
961    ///     .source_iter(q!(vec![1]))
962    ///     .batch(&tick, nondet!(/** test */));
963    /// keyed_singleton.filter_key_not_in(keys_to_remove)
964    /// #   .entries().all_ticks()
965    /// # }, |mut stream| async move {
966    /// // { 2: 4 }
967    /// # for w in vec![(2, 4)] {
968    /// #     assert_eq!(stream.next().await.unwrap(), w);
969    /// # }
970    /// # }));
971    /// # }
972    /// ```
973    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
974        self,
975        other: Stream<K, L, Bounded, O2, R2>,
976    ) -> Self
977    where
978        K: Hash + Eq,
979    {
980        check_matching_location(&self.location, &other.location);
981
982        KeyedSingleton::new(
983            self.location.clone(),
984            HydroNode::AntiJoin {
985                pos: Box::new(self.ir_node.into_inner()),
986                neg: Box::new(other.ir_node.into_inner()),
987                metadata: self.location.new_node_metadata(Self::collection_kind()),
988            },
989        )
990    }
991
992    /// An operator which allows you to "inspect" each value of a keyed singleton without
993    /// modifying it. The closure `f` is called on a reference to each value. This is
994    /// mainly useful for debugging, and should not be used to generate side-effects.
995    ///
996    /// # Example
997    /// ```rust
998    /// # #[cfg(feature = "deploy")] {
999    /// # use hydro_lang::prelude::*;
1000    /// # use futures::StreamExt;
1001    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1002    /// let keyed_singleton = // { 1: 2, 2: 4 }
1003    /// # process
1004    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1005    /// #     .into_keyed()
1006    /// #     .first();
1007    /// keyed_singleton
1008    ///     .inspect(q!(|v| println!("{}", v)))
1009    /// #   .entries()
1010    /// # }, |mut stream| async move {
1011    /// // { 1: 2, 2: 4 }
1012    /// # for w in vec![(1, 2), (2, 4)] {
1013    /// #     assert_eq!(stream.next().await.unwrap(), w);
1014    /// # }
1015    /// # }));
1016    /// # }
1017    /// ```
1018    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1019    where
1020        F: Fn(&V) + 'a,
1021    {
1022        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1023        let inspect_f = q!({
1024            let orig = f;
1025            move |t: &(_, _)| orig(&t.1)
1026        })
1027        .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1028        .into();
1029
1030        KeyedSingleton::new(
1031            self.location.clone(),
1032            HydroNode::Inspect {
1033                f: inspect_f,
1034                input: Box::new(self.ir_node.into_inner()),
1035                metadata: self.location.new_node_metadata(Self::collection_kind()),
1036            },
1037        )
1038    }
1039
1040    /// An operator which allows you to "inspect" each entry of a keyed singleton without
1041    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1042    /// mainly useful for debugging, and should not be used to generate side-effects.
1043    ///
1044    /// # Example
1045    /// ```rust
1046    /// # #[cfg(feature = "deploy")] {
1047    /// # use hydro_lang::prelude::*;
1048    /// # use futures::StreamExt;
1049    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1050    /// let keyed_singleton = // { 1: 2, 2: 4 }
1051    /// # process
1052    /// #     .source_iter(q!(vec![(1, 2), (2, 4)]))
1053    /// #     .into_keyed()
1054    /// #     .first();
1055    /// keyed_singleton
1056    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1057    /// #   .entries()
1058    /// # }, |mut stream| async move {
1059    /// // { 1: 2, 2: 4 }
1060    /// # for w in vec![(1, 2), (2, 4)] {
1061    /// #     assert_eq!(stream.next().await.unwrap(), w);
1062    /// # }
1063    /// # }));
1064    /// # }
1065    /// ```
1066    pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1067    where
1068        F: Fn(&(K, V)) + 'a,
1069    {
1070        let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1071
1072        KeyedSingleton::new(
1073            self.location.clone(),
1074            HydroNode::Inspect {
1075                f: inspect_f,
1076                input: Box::new(self.ir_node.into_inner()),
1077                metadata: self.location.new_node_metadata(Self::collection_kind()),
1078            },
1079        )
1080    }
1081
1082    /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1083    ///
1084    /// Because this method requires values to be bounded, the output [`Optional`] will only be
1085    /// asynchronously updated if a new key is added that is higher than the previous max key.
1086    ///
1087    /// # Example
1088    /// ```rust
1089    /// # #[cfg(feature = "deploy")] {
1090    /// # use hydro_lang::prelude::*;
1091    /// # use futures::StreamExt;
1092    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1093    /// let tick = process.tick();
1094    /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1095    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1096    /// #     .into_keyed()
1097    /// #     .first();
1098    /// keyed_singleton.get_max_key()
1099    /// # .sample_eager(nondet!(/** test */))
1100    /// # }, |mut stream| async move {
1101    /// // (2, 456)
1102    /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1103    /// # }));
1104    /// # }
1105    /// ```
1106    pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1107    where
1108        K: Ord,
1109    {
1110        self.entries()
1111            .assume_ordering_trusted(nondet!(
1112                /// There is only one element associated with each key, and the keys are totallly
1113                /// ordered so we will produce a deterministic value. The closure technically
1114                /// isn't commutative in the case where both passed entries have the same key
1115                /// but different values.
1116                ///
1117                /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1118                /// the two inputs do not have the same key.
1119            ))
1120            .reduce(q!(
1121                move |curr, new| {
1122                    if new.0 > curr.0 {
1123                        *curr = new;
1124                    }
1125                },
1126                idempotent = manual_proof!(/** repeated elements are ignored */)
1127            ))
1128    }
1129
1130    /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1131    /// element, the value.
1132    ///
1133    /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1134    ///
1135    /// # Example
1136    /// ```rust
1137    /// # #[cfg(feature = "deploy")] {
1138    /// # use hydro_lang::prelude::*;
1139    /// # use futures::StreamExt;
1140    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1141    /// let keyed_singleton = // { 1: 2, 2: 4 }
1142    /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1143    /// #     .into_keyed()
1144    /// #     .first();
1145    /// keyed_singleton
1146    ///     .clone()
1147    ///     .into_keyed_stream()
1148    ///     .interleave(
1149    ///         keyed_singleton.into_keyed_stream()
1150    ///     )
1151    /// #   .entries()
1152    /// # }, |mut stream| async move {
1153    /// /// // { 1: [2, 2], 2: [4, 4] }
1154    /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1155    /// #     assert_eq!(stream.next().await.unwrap(), w);
1156    /// # }
1157    /// # }));
1158    /// # }
1159    /// ```
1160    pub fn into_keyed_stream(
1161        self,
1162    ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1163        KeyedStream::new(
1164            self.location.clone(),
1165            HydroNode::Cast {
1166                inner: Box::new(self.ir_node.into_inner()),
1167                metadata: self.location.new_node_metadata(KeyedStream::<
1168                    K,
1169                    V,
1170                    L,
1171                    B::UnderlyingBound,
1172                    TotalOrder,
1173                    ExactlyOnce,
1174                >::collection_kind()),
1175            },
1176        )
1177    }
1178}
1179
1180impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1181where
1182    L: Location<'a>,
1183{
1184    /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1185    /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1186    ///
1187    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1188    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1189    /// argument that declares where the keyed singleton will be atomically processed. Batching a
1190    /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1191    /// batching into a different [`Tick`] will introduce asynchrony.
1192    pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1193        let out_location = Atomic { tick: tick.clone() };
1194        KeyedSingleton::new(
1195            out_location.clone(),
1196            HydroNode::BeginAtomic {
1197                inner: Box::new(self.ir_node.into_inner()),
1198                metadata: out_location
1199                    .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1200            },
1201        )
1202    }
1203}
1204
1205impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1206where
1207    L: Location<'a> + NoTick,
1208{
1209    /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1210    /// See [`KeyedSingleton::atomic`] for more details.
1211    pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1212        KeyedSingleton::new(
1213            self.location.tick.l.clone(),
1214            HydroNode::EndAtomic {
1215                inner: Box::new(self.ir_node.into_inner()),
1216                metadata: self
1217                    .location
1218                    .tick
1219                    .l
1220                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1221            },
1222        )
1223    }
1224}
1225
1226impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1227    /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1228    /// tick `T` always has the entries of `self` at tick `T - 1`.
1229    ///
1230    /// At tick `0`, the output has no entries, since there is no previous tick.
1231    ///
1232    /// This operator enables stateful iterative processing with ticks, by sending data from one
1233    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1234    ///
1235    /// # Example
1236    /// ```rust
1237    /// # #[cfg(feature = "deploy")] {
1238    /// # use hydro_lang::prelude::*;
1239    /// # use futures::StreamExt;
1240    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1241    /// let tick = process.tick();
1242    /// # // ticks are lazy by default, forces the second tick to run
1243    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1244    /// # let batch_first_tick = process
1245    /// #   .source_iter(q!(vec![(1, 2), (2, 3)]))
1246    /// #   .batch(&tick, nondet!(/** test */))
1247    /// #   .into_keyed();
1248    /// # let batch_second_tick = process
1249    /// #   .source_iter(q!(vec![(2, 4), (3, 5)]))
1250    /// #   .batch(&tick, nondet!(/** test */))
1251    /// #   .into_keyed()
1252    /// #   .defer_tick(); // appears on the second tick
1253    /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1254    /// # batch_first_tick.chain(batch_second_tick).first();
1255    /// input_batch.clone().filter_key_not_in(
1256    ///     input_batch.defer_tick().keys() // keys present in the previous tick
1257    /// )
1258    /// # .entries().all_ticks()
1259    /// # }, |mut stream| async move {
1260    /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1261    /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1262    /// #     assert_eq!(stream.next().await.unwrap(), w);
1263    /// # }
1264    /// # }));
1265    /// # }
1266    /// ```
1267    pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1268        KeyedSingleton::new(
1269            self.location.clone(),
1270            HydroNode::DeferTick {
1271                input: Box::new(self.ir_node.into_inner()),
1272                metadata: self
1273                    .location
1274                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1275            },
1276        )
1277    }
1278}
1279
1280impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1281where
1282    L: Location<'a>,
1283{
1284    /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1285    /// point in time.
1286    ///
1287    /// # Non-Determinism
1288    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1289    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1290    pub fn snapshot(
1291        self,
1292        tick: &Tick<L>,
1293        _nondet: NonDet,
1294    ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1295        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1296        KeyedSingleton::new(
1297            tick.clone(),
1298            HydroNode::Batch {
1299                inner: Box::new(self.ir_node.into_inner()),
1300                metadata: tick
1301                    .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1302            },
1303        )
1304    }
1305}
1306
1307impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1308where
1309    L: Location<'a> + NoTick,
1310{
1311    /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1312    /// state of the keyed singleton being atomically processed.
1313    ///
1314    /// # Non-Determinism
1315    /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1316    /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1317    pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1318        KeyedSingleton::new(
1319            self.location.clone().tick,
1320            HydroNode::Batch {
1321                inner: Box::new(self.ir_node.into_inner()),
1322                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1323                    K,
1324                    V,
1325                    Tick<L>,
1326                    Bounded,
1327                >::collection_kind(
1328                )),
1329            },
1330        )
1331    }
1332}
1333
1334impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1335where
1336    L: Location<'a>,
1337{
1338    /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1339    ///
1340    /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1341    /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1342    /// is filtered out.
1343    ///
1344    /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1345    /// not modify or take ownership of the values. If you need to modify the values while filtering
1346    /// use [`KeyedSingleton::filter_map`] instead.
1347    ///
1348    /// # Example
1349    /// ```rust
1350    /// # #[cfg(feature = "deploy")] {
1351    /// # use hydro_lang::prelude::*;
1352    /// # use futures::StreamExt;
1353    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1354    /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1355    /// # process
1356    /// #     .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1357    /// #     .into_keyed()
1358    /// #     .first();
1359    /// keyed_singleton.filter(q!(|&v| v > 1))
1360    /// #   .entries()
1361    /// # }, |mut stream| async move {
1362    /// // { 1: 2, 2: 4 }
1363    /// # let mut results = Vec::new();
1364    /// # for _ in 0..2 {
1365    /// #     results.push(stream.next().await.unwrap());
1366    /// # }
1367    /// # results.sort();
1368    /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1369    /// # }));
1370    /// # }
1371    /// ```
1372    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1373    where
1374        F: Fn(&V) -> bool + 'a,
1375    {
1376        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1377        let filter_f = q!({
1378            let orig = f;
1379            move |t: &(_, _)| orig(&t.1)
1380        })
1381        .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1382        .into();
1383
1384        KeyedSingleton::new(
1385            self.location.clone(),
1386            HydroNode::Filter {
1387                f: filter_f,
1388                input: Box::new(self.ir_node.into_inner()),
1389                metadata: self
1390                    .location
1391                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1392            },
1393        )
1394    }
1395
1396    /// An operator that both filters and maps values. It yields only the key-value pairs where
1397    /// the supplied closure `f` returns `Some(value)`.
1398    ///
1399    /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1400    /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1401    /// If it returns `None`, the key-value pair is filtered out.
1402    ///
1403    /// # Example
1404    /// ```rust
1405    /// # #[cfg(feature = "deploy")] {
1406    /// # use hydro_lang::prelude::*;
1407    /// # use futures::StreamExt;
1408    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1409    /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1410    /// # process
1411    /// #     .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1412    /// #     .into_keyed()
1413    /// #     .first();
1414    /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1415    /// #   .entries()
1416    /// # }, |mut stream| async move {
1417    /// // { 1: 42, 3: 100 }
1418    /// # let mut results = Vec::new();
1419    /// # for _ in 0..2 {
1420    /// #     results.push(stream.next().await.unwrap());
1421    /// # }
1422    /// # results.sort();
1423    /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1424    /// # }));
1425    /// # }
1426    /// ```
1427    pub fn filter_map<F, U>(
1428        self,
1429        f: impl IntoQuotedMut<'a, F, L> + Copy,
1430    ) -> KeyedSingleton<K, U, L, B>
1431    where
1432        F: Fn(V) -> Option<U> + 'a,
1433    {
1434        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1435        let filter_map_f = q!({
1436            let orig = f;
1437            move |(k, v)| orig(v).map(|o| (k, o))
1438        })
1439        .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1440        .into();
1441
1442        KeyedSingleton::new(
1443            self.location.clone(),
1444            HydroNode::FilterMap {
1445                f: filter_map_f,
1446                input: Box::new(self.ir_node.into_inner()),
1447                metadata: self
1448                    .location
1449                    .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1450            },
1451        )
1452    }
1453
1454    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1455    /// arrived since the previous batch was released.
1456    ///
1457    /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1458    /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1459    ///
1460    /// # Non-Determinism
1461    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1462    /// has a non-deterministic set of key-value pairs.
1463    pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1464    where
1465        L: NoTick,
1466    {
1467        self.atomic(tick).batch_atomic(nondet)
1468    }
1469}
1470
1471impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1472where
1473    L: Location<'a> + NoTick,
1474{
1475    /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1476    /// atomically processed.
1477    ///
1478    /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1479    /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1480    ///
1481    /// # Non-Determinism
1482    /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1483    /// has a non-deterministic set of key-value pairs.
1484    pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1485        let _ = nondet;
1486        KeyedSingleton::new(
1487            self.location.clone().tick,
1488            HydroNode::Batch {
1489                inner: Box::new(self.ir_node.into_inner()),
1490                metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1491                    K,
1492                    V,
1493                    Tick<L>,
1494                    Bounded,
1495                >::collection_kind(
1496                )),
1497            },
1498        )
1499    }
1500}
1501
1502#[cfg(test)]
1503mod tests {
1504    #[cfg(feature = "deploy")]
1505    use futures::{SinkExt, StreamExt};
1506    #[cfg(feature = "deploy")]
1507    use hydro_deploy::Deployment;
1508    #[cfg(any(feature = "deploy", feature = "sim"))]
1509    use stageleft::q;
1510
1511    #[cfg(any(feature = "deploy", feature = "sim"))]
1512    use crate::compile::builder::FlowBuilder;
1513    #[cfg(any(feature = "deploy", feature = "sim"))]
1514    use crate::location::Location;
1515    #[cfg(any(feature = "deploy", feature = "sim"))]
1516    use crate::nondet::nondet;
1517
1518    #[cfg(feature = "deploy")]
1519    #[tokio::test]
1520    async fn key_count_bounded_value() {
1521        let mut deployment = Deployment::new();
1522
1523        let mut flow = FlowBuilder::new();
1524        let node = flow.process::<()>();
1525        let external = flow.external::<()>();
1526
1527        let (input_port, input) = node.source_external_bincode(&external);
1528        let out = input
1529            .into_keyed()
1530            .first()
1531            .key_count()
1532            .sample_eager(nondet!(/** test */))
1533            .send_bincode_external(&external);
1534
1535        let nodes = flow
1536            .with_process(&node, deployment.Localhost())
1537            .with_external(&external, deployment.Localhost())
1538            .deploy(&mut deployment);
1539
1540        deployment.deploy().await.unwrap();
1541
1542        let mut external_in = nodes.connect(input_port).await;
1543        let mut external_out = nodes.connect(out).await;
1544
1545        deployment.start().await.unwrap();
1546
1547        assert_eq!(external_out.next().await.unwrap(), 0);
1548
1549        external_in.send((1, 1)).await.unwrap();
1550        assert_eq!(external_out.next().await.unwrap(), 1);
1551
1552        external_in.send((2, 2)).await.unwrap();
1553        assert_eq!(external_out.next().await.unwrap(), 2);
1554    }
1555
1556    #[cfg(feature = "deploy")]
1557    #[tokio::test]
1558    async fn key_count_unbounded_value() {
1559        let mut deployment = Deployment::new();
1560
1561        let mut flow = FlowBuilder::new();
1562        let node = flow.process::<()>();
1563        let external = flow.external::<()>();
1564
1565        let (input_port, input) = node.source_external_bincode(&external);
1566        let out = input
1567            .into_keyed()
1568            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1569            .key_count()
1570            .sample_eager(nondet!(/** test */))
1571            .send_bincode_external(&external);
1572
1573        let nodes = flow
1574            .with_process(&node, deployment.Localhost())
1575            .with_external(&external, deployment.Localhost())
1576            .deploy(&mut deployment);
1577
1578        deployment.deploy().await.unwrap();
1579
1580        let mut external_in = nodes.connect(input_port).await;
1581        let mut external_out = nodes.connect(out).await;
1582
1583        deployment.start().await.unwrap();
1584
1585        assert_eq!(external_out.next().await.unwrap(), 0);
1586
1587        external_in.send((1, 1)).await.unwrap();
1588        assert_eq!(external_out.next().await.unwrap(), 1);
1589
1590        external_in.send((1, 2)).await.unwrap();
1591        assert_eq!(external_out.next().await.unwrap(), 1);
1592
1593        external_in.send((2, 2)).await.unwrap();
1594        assert_eq!(external_out.next().await.unwrap(), 2);
1595
1596        external_in.send((1, 1)).await.unwrap();
1597        assert_eq!(external_out.next().await.unwrap(), 2);
1598
1599        external_in.send((3, 1)).await.unwrap();
1600        assert_eq!(external_out.next().await.unwrap(), 3);
1601    }
1602
1603    #[cfg(feature = "deploy")]
1604    #[tokio::test]
1605    async fn into_singleton_bounded_value() {
1606        let mut deployment = Deployment::new();
1607
1608        let mut flow = FlowBuilder::new();
1609        let node = flow.process::<()>();
1610        let external = flow.external::<()>();
1611
1612        let (input_port, input) = node.source_external_bincode(&external);
1613        let out = input
1614            .into_keyed()
1615            .first()
1616            .into_singleton()
1617            .sample_eager(nondet!(/** test */))
1618            .send_bincode_external(&external);
1619
1620        let nodes = flow
1621            .with_process(&node, deployment.Localhost())
1622            .with_external(&external, deployment.Localhost())
1623            .deploy(&mut deployment);
1624
1625        deployment.deploy().await.unwrap();
1626
1627        let mut external_in = nodes.connect(input_port).await;
1628        let mut external_out = nodes.connect(out).await;
1629
1630        deployment.start().await.unwrap();
1631
1632        assert_eq!(
1633            external_out.next().await.unwrap(),
1634            std::collections::HashMap::new()
1635        );
1636
1637        external_in.send((1, 1)).await.unwrap();
1638        assert_eq!(
1639            external_out.next().await.unwrap(),
1640            vec![(1, 1)].into_iter().collect()
1641        );
1642
1643        external_in.send((2, 2)).await.unwrap();
1644        assert_eq!(
1645            external_out.next().await.unwrap(),
1646            vec![(1, 1), (2, 2)].into_iter().collect()
1647        );
1648    }
1649
1650    #[cfg(feature = "deploy")]
1651    #[tokio::test]
1652    async fn into_singleton_unbounded_value() {
1653        let mut deployment = Deployment::new();
1654
1655        let mut flow = FlowBuilder::new();
1656        let node = flow.process::<()>();
1657        let external = flow.external::<()>();
1658
1659        let (input_port, input) = node.source_external_bincode(&external);
1660        let out = input
1661            .into_keyed()
1662            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1663            .into_singleton()
1664            .sample_eager(nondet!(/** test */))
1665            .send_bincode_external(&external);
1666
1667        let nodes = flow
1668            .with_process(&node, deployment.Localhost())
1669            .with_external(&external, deployment.Localhost())
1670            .deploy(&mut deployment);
1671
1672        deployment.deploy().await.unwrap();
1673
1674        let mut external_in = nodes.connect(input_port).await;
1675        let mut external_out = nodes.connect(out).await;
1676
1677        deployment.start().await.unwrap();
1678
1679        assert_eq!(
1680            external_out.next().await.unwrap(),
1681            std::collections::HashMap::new()
1682        );
1683
1684        external_in.send((1, 1)).await.unwrap();
1685        assert_eq!(
1686            external_out.next().await.unwrap(),
1687            vec![(1, 1)].into_iter().collect()
1688        );
1689
1690        external_in.send((1, 2)).await.unwrap();
1691        assert_eq!(
1692            external_out.next().await.unwrap(),
1693            vec![(1, 2)].into_iter().collect()
1694        );
1695
1696        external_in.send((2, 2)).await.unwrap();
1697        assert_eq!(
1698            external_out.next().await.unwrap(),
1699            vec![(1, 2), (2, 1)].into_iter().collect()
1700        );
1701
1702        external_in.send((1, 1)).await.unwrap();
1703        assert_eq!(
1704            external_out.next().await.unwrap(),
1705            vec![(1, 3), (2, 1)].into_iter().collect()
1706        );
1707
1708        external_in.send((3, 1)).await.unwrap();
1709        assert_eq!(
1710            external_out.next().await.unwrap(),
1711            vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1712        );
1713    }
1714
1715    #[cfg(feature = "sim")]
1716    #[test]
1717    fn sim_unbounded_singleton_snapshot() {
1718        let mut flow = FlowBuilder::new();
1719        let node = flow.process::<()>();
1720
1721        let (input_port, input) = node.sim_input();
1722        let output = input
1723            .into_keyed()
1724            .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1725            .snapshot(&node.tick(), nondet!(/** test */))
1726            .entries()
1727            .all_ticks()
1728            .sim_output();
1729
1730        let count = flow.sim().exhaustive(async || {
1731            input_port.send((1, 123));
1732            input_port.send((1, 456));
1733            input_port.send((2, 123));
1734
1735            let all = output.collect_sorted::<Vec<_>>().await;
1736            assert_eq!(all.last().unwrap(), &(2, 1));
1737        });
1738
1739        assert_eq!(count, 8);
1740    }
1741
1742    #[cfg(feature = "deploy")]
1743    #[tokio::test]
1744    async fn join_keyed_stream() {
1745        let mut deployment = Deployment::new();
1746
1747        let mut flow = FlowBuilder::new();
1748        let node = flow.process::<()>();
1749        let external = flow.external::<()>();
1750
1751        let tick = node.tick();
1752        let keyed_data = node
1753            .source_iter(q!(vec![(1, 10), (2, 20)]))
1754            .into_keyed()
1755            .batch(&tick, nondet!(/** test */))
1756            .first();
1757        let requests = node
1758            .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1759            .into_keyed()
1760            .batch(&tick, nondet!(/** test */));
1761
1762        let out = keyed_data
1763            .join_keyed_stream(requests)
1764            .entries()
1765            .all_ticks()
1766            .send_bincode_external(&external);
1767
1768        let nodes = flow
1769            .with_process(&node, deployment.Localhost())
1770            .with_external(&external, deployment.Localhost())
1771            .deploy(&mut deployment);
1772
1773        deployment.deploy().await.unwrap();
1774
1775        let mut external_out = nodes.connect(out).await;
1776
1777        deployment.start().await.unwrap();
1778
1779        let mut results = vec![];
1780        for _ in 0..2 {
1781            results.push(external_out.next().await.unwrap());
1782        }
1783        results.sort();
1784
1785        assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1786    }
1787}