hydro_lang/live_collections/keyed_singleton.rs
1//! Definitions for the [`KeyedSingleton`] live collection.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::hash::Hash;
6use std::marker::PhantomData;
7use std::ops::Deref;
8use std::rc::Rc;
9
10use stageleft::{IntoQuotedMut, QuotedWithContext, q};
11
12use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
13use super::keyed_stream::KeyedStream;
14use super::optional::Optional;
15use super::singleton::Singleton;
16use super::stream::{ExactlyOnce, NoOrder, Stream, TotalOrder};
17use crate::compile::builder::CycleId;
18use crate::compile::ir::{
19 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, KeyedSingletonBoundKind, TeeNode,
20};
21#[cfg(stageleft_runtime)]
22use crate::forward_handle::{CycleCollection, ReceiverComplete};
23use crate::forward_handle::{ForwardRef, TickCycle};
24use crate::live_collections::stream::{Ordering, Retries};
25#[cfg(stageleft_runtime)]
26use crate::location::dynamic::{DynLocation, LocationId};
27use crate::location::tick::DeferTick;
28use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
29use crate::manual_expr::ManualExpr;
30use crate::nondet::{NonDet, nondet};
31use crate::properties::manual_proof;
32
33/// A marker trait indicating which components of a [`KeyedSingleton`] may change.
34///
35/// In addition to [`Bounded`] (all entries are fixed) and [`Unbounded`] (entries may be added /
36/// changed, but not removed), this also includes an additional variant [`BoundedValue`], which
37/// indicates that entries may be added over time, but once an entry is added it will never be
38/// removed and its value will never change.
39pub trait KeyedSingletonBound {
40 /// The [`Boundedness`] of the [`Stream`] underlying the keyed singleton.
41 type UnderlyingBound: Boundedness;
42 /// The [`Boundedness`] of each entry's value; [`Bounded`] means it is immutable.
43 type ValueBound: Boundedness;
44
45 /// The type of the keyed singleton if the value for each key is immutable.
46 type WithBoundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Bounded>;
47
48 /// The type of the keyed singleton if the value for each key may change asynchronously.
49 type WithUnboundedValue: KeyedSingletonBound<UnderlyingBound = Self::UnderlyingBound, ValueBound = Unbounded>;
50
51 /// Returns the [`KeyedSingletonBoundKind`] corresponding to this type.
52 fn bound_kind() -> KeyedSingletonBoundKind;
53}
54
55impl KeyedSingletonBound for Unbounded {
56 type UnderlyingBound = Unbounded;
57 type ValueBound = Unbounded;
58 type WithBoundedValue = BoundedValue;
59 type WithUnboundedValue = Unbounded;
60
61 fn bound_kind() -> KeyedSingletonBoundKind {
62 KeyedSingletonBoundKind::Unbounded
63 }
64}
65
66impl KeyedSingletonBound for Bounded {
67 type UnderlyingBound = Bounded;
68 type ValueBound = Bounded;
69 type WithBoundedValue = Bounded;
70 type WithUnboundedValue = UnreachableBound;
71
72 fn bound_kind() -> KeyedSingletonBoundKind {
73 KeyedSingletonBoundKind::Bounded
74 }
75}
76
77/// A variation of boundedness specific to [`KeyedSingleton`], which indicates that once a key appears,
78/// its value is bounded and will never change. If the `KeyBound` is [`Bounded`], then the entire set of entries
79/// is bounded, but if it is [`Unbounded`], then new entries may appear asynchronously.
80pub struct BoundedValue;
81
82impl KeyedSingletonBound for BoundedValue {
83 type UnderlyingBound = Unbounded;
84 type ValueBound = Bounded;
85 type WithBoundedValue = BoundedValue;
86 type WithUnboundedValue = Unbounded;
87
88 fn bound_kind() -> KeyedSingletonBoundKind {
89 KeyedSingletonBoundKind::BoundedValue
90 }
91}
92
93#[doc(hidden)]
94pub struct UnreachableBound;
95
96impl KeyedSingletonBound for UnreachableBound {
97 type UnderlyingBound = Bounded;
98 type ValueBound = Unbounded;
99
100 type WithBoundedValue = Bounded;
101 type WithUnboundedValue = UnreachableBound;
102
103 fn bound_kind() -> KeyedSingletonBoundKind {
104 unreachable!("UnreachableBound cannot be instantiated")
105 }
106}
107
108/// Mapping from keys of type `K` to values of type `V`.
109///
110/// Keyed Singletons capture an asynchronously updated mapping from keys of the `K` to values of
111/// type `V`, where the order of keys is non-deterministic. In addition to the standard boundedness
112/// variants ([`Bounded`] for finite and immutable, [`Unbounded`] for asynchronously changing),
113/// keyed singletons can use [`BoundedValue`] to declare that new keys may be added over time, but
114/// keys cannot be removed and the value for each key is immutable.
115///
116/// Type Parameters:
117/// - `K`: the type of the key for each entry
118/// - `V`: the type of the value for each entry
119/// - `Loc`: the [`Location`] where the keyed singleton is materialized
120/// - `Bound`: tracks whether the entries are:
121/// - [`Bounded`] (local and finite)
122/// - [`Unbounded`] (asynchronous with entries added / removed / changed over time)
123/// - [`BoundedValue`] (asynchronous with immutable values for each key and no removals)
124pub struct KeyedSingleton<K, V, Loc, Bound: KeyedSingletonBound> {
125 pub(crate) location: Loc,
126 pub(crate) ir_node: RefCell<HydroNode>,
127
128 _phantom: PhantomData<(K, V, Loc, Bound)>,
129}
130
131impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: KeyedSingletonBound> Clone
132 for KeyedSingleton<K, V, Loc, Bound>
133{
134 fn clone(&self) -> Self {
135 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
136 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
137 *self.ir_node.borrow_mut() = HydroNode::Tee {
138 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
139 metadata: self.location.new_node_metadata(Self::collection_kind()),
140 };
141 }
142
143 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
144 KeyedSingleton {
145 location: self.location.clone(),
146 ir_node: HydroNode::Tee {
147 inner: TeeNode(inner.0.clone()),
148 metadata: metadata.clone(),
149 }
150 .into(),
151 _phantom: PhantomData,
152 }
153 } else {
154 unreachable!()
155 }
156 }
157}
158
159impl<'a, K, V, L, B: KeyedSingletonBound> CycleCollection<'a, ForwardRef>
160 for KeyedSingleton<K, V, L, B>
161where
162 L: Location<'a> + NoTick,
163{
164 type Location = L;
165
166 fn create_source(cycle_id: CycleId, location: L) -> Self {
167 KeyedSingleton {
168 location: location.clone(),
169 ir_node: RefCell::new(HydroNode::CycleSource {
170 cycle_id,
171 metadata: location.new_node_metadata(Self::collection_kind()),
172 }),
173 _phantom: PhantomData,
174 }
175 }
176}
177
178impl<'a, K, V, L> CycleCollection<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
179where
180 L: Location<'a>,
181{
182 type Location = Tick<L>;
183
184 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
185 KeyedSingleton::new(
186 location.clone(),
187 HydroNode::CycleSource {
188 cycle_id,
189 metadata: location.new_node_metadata(Self::collection_kind()),
190 },
191 )
192 }
193}
194
195impl<'a, K, V, L> DeferTick for KeyedSingleton<K, V, Tick<L>, Bounded>
196where
197 L: Location<'a>,
198{
199 fn defer_tick(self) -> Self {
200 KeyedSingleton::defer_tick(self)
201 }
202}
203
204impl<'a, K, V, L, B: KeyedSingletonBound> ReceiverComplete<'a, ForwardRef>
205 for KeyedSingleton<K, V, L, B>
206where
207 L: Location<'a> + NoTick,
208{
209 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
210 assert_eq!(
211 Location::id(&self.location),
212 expected_location,
213 "locations do not match"
214 );
215 self.location
216 .flow_state()
217 .borrow_mut()
218 .push_root(HydroRoot::CycleSink {
219 cycle_id,
220 input: Box::new(self.ir_node.into_inner()),
221 op_metadata: HydroIrOpMetadata::new(),
222 });
223 }
224}
225
226impl<'a, K, V, L> ReceiverComplete<'a, TickCycle> for KeyedSingleton<K, V, Tick<L>, Bounded>
227where
228 L: Location<'a>,
229{
230 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
231 assert_eq!(
232 Location::id(&self.location),
233 expected_location,
234 "locations do not match"
235 );
236 self.location
237 .flow_state()
238 .borrow_mut()
239 .push_root(HydroRoot::CycleSink {
240 cycle_id,
241 input: Box::new(self.ir_node.into_inner()),
242 op_metadata: HydroIrOpMetadata::new(),
243 });
244 }
245}
246
247impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
248 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
249 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
250 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
251
252 KeyedSingleton {
253 location,
254 ir_node: RefCell::new(ir_node),
255 _phantom: PhantomData,
256 }
257 }
258
259 /// Returns the [`Location`] where this keyed singleton is being materialized.
260 pub fn location(&self) -> &L {
261 &self.location
262 }
263}
264
265#[cfg(stageleft_runtime)]
266fn key_count_inside_tick<'a, K, V, L: Location<'a>>(
267 me: KeyedSingleton<K, V, L, Bounded>,
268) -> Singleton<usize, L, Bounded> {
269 me.entries().count()
270}
271
272#[cfg(stageleft_runtime)]
273fn into_singleton_inside_tick<'a, K, V, L: Location<'a>>(
274 me: KeyedSingleton<K, V, L, Bounded>,
275) -> Singleton<HashMap<K, V>, L, Bounded>
276where
277 K: Eq + Hash,
278{
279 me.entries()
280 .assume_ordering(nondet!(
281 /// Because this is a keyed singleton, there is only one value per key.
282 ))
283 .fold(
284 q!(|| HashMap::new()),
285 q!(|map, (k, v)| {
286 map.insert(k, v);
287 }),
288 )
289}
290
291impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B> {
292 pub(crate) fn collection_kind() -> CollectionKind {
293 CollectionKind::KeyedSingleton {
294 bound: B::bound_kind(),
295 key_type: stageleft::quote_type::<K>().into(),
296 value_type: stageleft::quote_type::<V>().into(),
297 }
298 }
299
300 /// Transforms each value by invoking `f` on each element, with keys staying the same
301 /// after transformation. If you need access to the key, see [`KeyedSingleton::map_with_key`].
302 ///
303 /// If you do not want to modify the stream and instead only want to view
304 /// each item use [`KeyedSingleton::inspect`] instead.
305 ///
306 /// # Example
307 /// ```rust
308 /// # #[cfg(feature = "deploy")] {
309 /// # use hydro_lang::prelude::*;
310 /// # use futures::StreamExt;
311 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
312 /// let keyed_singleton = // { 1: 2, 2: 4 }
313 /// # process
314 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
315 /// # .into_keyed()
316 /// # .first();
317 /// keyed_singleton.map(q!(|v| v + 1))
318 /// # .entries()
319 /// # }, |mut stream| async move {
320 /// // { 1: 3, 2: 5 }
321 /// # let mut results = Vec::new();
322 /// # for _ in 0..2 {
323 /// # results.push(stream.next().await.unwrap());
324 /// # }
325 /// # results.sort();
326 /// # assert_eq!(results, vec![(1, 3), (2, 5)]);
327 /// # }));
328 /// # }
329 /// ```
330 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, U, L, B>
331 where
332 F: Fn(V) -> U + 'a,
333 {
334 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
335 let map_f = q!({
336 let orig = f;
337 move |(k, v)| (k, orig(v))
338 })
339 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
340 .into();
341
342 KeyedSingleton::new(
343 self.location.clone(),
344 HydroNode::Map {
345 f: map_f,
346 input: Box::new(self.ir_node.into_inner()),
347 metadata: self
348 .location
349 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
350 },
351 )
352 }
353
354 /// Transforms each value by invoking `f` on each key-value pair, with keys staying the same
355 /// after transformation. Unlike [`KeyedSingleton::map`], this gives access to both the key and value.
356 ///
357 /// The closure `f` receives a tuple `(K, V)` containing both the key and value, and returns
358 /// the new value `U`. The key remains unchanged in the output.
359 ///
360 /// # Example
361 /// ```rust
362 /// # #[cfg(feature = "deploy")] {
363 /// # use hydro_lang::prelude::*;
364 /// # use futures::StreamExt;
365 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
366 /// let keyed_singleton = // { 1: 2, 2: 4 }
367 /// # process
368 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
369 /// # .into_keyed()
370 /// # .first();
371 /// keyed_singleton.map_with_key(q!(|(k, v)| k + v))
372 /// # .entries()
373 /// # }, |mut stream| async move {
374 /// // { 1: 3, 2: 6 }
375 /// # let mut results = Vec::new();
376 /// # for _ in 0..2 {
377 /// # results.push(stream.next().await.unwrap());
378 /// # }
379 /// # results.sort();
380 /// # assert_eq!(results, vec![(1, 3), (2, 6)]);
381 /// # }));
382 /// # }
383 /// ```
384 pub fn map_with_key<U, F>(
385 self,
386 f: impl IntoQuotedMut<'a, F, L> + Copy,
387 ) -> KeyedSingleton<K, U, L, B>
388 where
389 F: Fn((K, V)) -> U + 'a,
390 K: Clone,
391 {
392 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
393 let map_f = q!({
394 let orig = f;
395 move |(k, v)| {
396 let out = orig((Clone::clone(&k), v));
397 (k, out)
398 }
399 })
400 .splice_fn1_ctx::<(K, V), (K, U)>(&self.location)
401 .into();
402
403 KeyedSingleton::new(
404 self.location.clone(),
405 HydroNode::Map {
406 f: map_f,
407 input: Box::new(self.ir_node.into_inner()),
408 metadata: self
409 .location
410 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
411 },
412 )
413 }
414
415 /// Gets the number of keys in the keyed singleton.
416 ///
417 /// The output singleton will be unbounded if the input is [`Unbounded`] or [`BoundedValue`],
418 /// since keys may be added / removed over time. When the set of keys changes, the count will
419 /// be asynchronously updated.
420 ///
421 /// # Example
422 /// ```rust
423 /// # #[cfg(feature = "deploy")] {
424 /// # use hydro_lang::prelude::*;
425 /// # use futures::StreamExt;
426 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
427 /// # let tick = process.tick();
428 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
429 /// # process
430 /// # .source_iter(q!(vec![(1, "a"), (2, "b"), (3, "c")]))
431 /// # .into_keyed()
432 /// # .batch(&tick, nondet!(/** test */))
433 /// # .first();
434 /// keyed_singleton.key_count()
435 /// # .all_ticks()
436 /// # }, |mut stream| async move {
437 /// // 3
438 /// # assert_eq!(stream.next().await.unwrap(), 3);
439 /// # }));
440 /// # }
441 /// ```
442 pub fn key_count(self) -> Singleton<usize, L, B::UnderlyingBound> {
443 if B::ValueBound::BOUNDED {
444 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
445 location: self.location,
446 ir_node: self.ir_node,
447 _phantom: PhantomData,
448 };
449
450 me.entries().count()
451 } else if L::is_top_level()
452 && let Some(tick) = self.location.try_tick()
453 {
454 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
455 location: self.location,
456 ir_node: self.ir_node,
457 _phantom: PhantomData,
458 };
459
460 let out =
461 key_count_inside_tick(me.snapshot(&tick, nondet!(/** eventually stabilizes */)))
462 .latest();
463 Singleton::new(out.location, out.ir_node.into_inner())
464 } else {
465 panic!("Unbounded KeyedSingleton inside a tick");
466 }
467 }
468
469 /// Converts this keyed singleton into a [`Singleton`] containing a `HashMap` from keys to values.
470 ///
471 /// As the values for each key are updated asynchronously, the `HashMap` will be updated
472 /// asynchronously as well.
473 ///
474 /// # Example
475 /// ```rust
476 /// # #[cfg(feature = "deploy")] {
477 /// # use hydro_lang::prelude::*;
478 /// # use futures::StreamExt;
479 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
480 /// let keyed_singleton = // { 1: "a", 2: "b", 3: "c" }
481 /// # process
482 /// # .source_iter(q!(vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())]))
483 /// # .into_keyed()
484 /// # .batch(&process.tick(), nondet!(/** test */))
485 /// # .first();
486 /// keyed_singleton.into_singleton()
487 /// # .all_ticks()
488 /// # }, |mut stream| async move {
489 /// // { 1: "a", 2: "b", 3: "c" }
490 /// # assert_eq!(stream.next().await.unwrap(), vec![(1, "a".to_owned()), (2, "b".to_owned()), (3, "c".to_owned())].into_iter().collect());
491 /// # }));
492 /// # }
493 /// ```
494 pub fn into_singleton(self) -> Singleton<HashMap<K, V>, L, B::UnderlyingBound>
495 where
496 K: Eq + Hash,
497 {
498 if B::ValueBound::BOUNDED {
499 let me: KeyedSingleton<K, V, L, B::WithBoundedValue> = KeyedSingleton {
500 location: self.location,
501 ir_node: self.ir_node,
502 _phantom: PhantomData,
503 };
504
505 me.entries()
506 .assume_ordering(nondet!(
507 /// Because this is a keyed singleton, there is only one value per key.
508 ))
509 .fold(
510 q!(|| HashMap::new()),
511 q!(|map, (k, v)| {
512 // TODO(shadaj): make this commutative but really-debug-assert that there is no key overlap
513 map.insert(k, v);
514 }),
515 )
516 } else if L::is_top_level()
517 && let Some(tick) = self.location.try_tick()
518 {
519 let me: KeyedSingleton<K, V, L, B::WithUnboundedValue> = KeyedSingleton {
520 location: self.location,
521 ir_node: self.ir_node,
522 _phantom: PhantomData,
523 };
524
525 let out = into_singleton_inside_tick(
526 me.snapshot(&tick, nondet!(/** eventually stabilizes */)),
527 )
528 .latest();
529 Singleton::new(out.location, out.ir_node.into_inner())
530 } else {
531 panic!("Unbounded KeyedSingleton inside a tick");
532 }
533 }
534
535 /// An operator which allows you to "name" a `HydroNode`.
536 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
537 pub fn ir_node_named(self, name: &str) -> KeyedSingleton<K, V, L, B> {
538 {
539 let mut node = self.ir_node.borrow_mut();
540 let metadata = node.metadata_mut();
541 metadata.tag = Some(name.to_owned());
542 }
543 self
544 }
545
546 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
547 /// implies that `B == Bounded`.
548 pub fn make_bounded(self) -> KeyedSingleton<K, V, L, Bounded>
549 where
550 B: IsBounded,
551 {
552 KeyedSingleton::new(self.location, self.ir_node.into_inner())
553 }
554
555 /// Gets the value associated with a specific key from the keyed singleton.
556 /// Returns `None` if the key is `None` or there is no associated value.
557 ///
558 /// # Example
559 /// ```rust
560 /// # #[cfg(feature = "deploy")] {
561 /// # use hydro_lang::prelude::*;
562 /// # use futures::StreamExt;
563 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
564 /// let tick = process.tick();
565 /// let keyed_data = process
566 /// .source_iter(q!(vec![(1, 2), (2, 3)]))
567 /// .into_keyed()
568 /// .batch(&tick, nondet!(/** test */))
569 /// .first();
570 /// let key = tick.singleton(q!(1));
571 /// keyed_data.get(key).all_ticks()
572 /// # }, |mut stream| async move {
573 /// // 2
574 /// # assert_eq!(stream.next().await.unwrap(), 2);
575 /// # }));
576 /// # }
577 /// ```
578 pub fn get(self, key: impl Into<Optional<K, L, Bounded>>) -> Optional<V, L, Bounded>
579 where
580 B: IsBounded,
581 K: Hash + Eq,
582 {
583 self.make_bounded()
584 .into_keyed_stream()
585 .get(key)
586 .assume_ordering::<TotalOrder>(nondet!(/** only a single key, so totally ordered */))
587 .first()
588 }
589
590 /// Emit a keyed stream containing keys shared between the keyed singleton and the
591 /// keyed stream, where each value in the output keyed stream is a tuple of
592 /// (the keyed singleton's value, the keyed stream's value).
593 ///
594 /// # Example
595 /// ```rust
596 /// # #[cfg(feature = "deploy")] {
597 /// # use hydro_lang::prelude::*;
598 /// # use futures::StreamExt;
599 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
600 /// let tick = process.tick();
601 /// let keyed_data = process
602 /// .source_iter(q!(vec![(1, 10), (2, 20)]))
603 /// .into_keyed()
604 /// .batch(&tick, nondet!(/** test */))
605 /// .first();
606 /// let other_data = process
607 /// .source_iter(q!(vec![(1, 100), (2, 200), (1, 101)]))
608 /// .into_keyed()
609 /// .batch(&tick, nondet!(/** test */));
610 /// keyed_data.join_keyed_stream(other_data).entries().all_ticks()
611 /// # }, |mut stream| async move {
612 /// // { 1: [(10, 100), (10, 101)], 2: [(20, 200)] } in any order
613 /// # let mut results = vec![];
614 /// # for _ in 0..3 {
615 /// # results.push(stream.next().await.unwrap());
616 /// # }
617 /// # results.sort();
618 /// # assert_eq!(results, vec![(1, (10, 100)), (1, (10, 101)), (2, (20, 200))]);
619 /// # }));
620 /// # }
621 /// ```
622 pub fn join_keyed_stream<O2: Ordering, R2: Retries, V2>(
623 self,
624 keyed_stream: KeyedStream<K, V2, L, Bounded, O2, R2>,
625 ) -> KeyedStream<K, (V, V2), L, Bounded, NoOrder, R2>
626 where
627 B: IsBounded,
628 K: Eq + Hash,
629 {
630 self.make_bounded()
631 .entries()
632 .weaken_retries::<R2>()
633 .join(keyed_stream.entries())
634 .into_keyed()
635 }
636
637 /// Emit a keyed singleton containing all keys shared between two keyed singletons,
638 /// where each value in the output keyed singleton is a tuple of
639 /// (self.value, other.value).
640 ///
641 /// # Example
642 /// ```rust
643 /// # #[cfg(feature = "deploy")] {
644 /// # use hydro_lang::prelude::*;
645 /// # use futures::StreamExt;
646 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
647 /// # let tick = process.tick();
648 /// let requests = // { 1: 10, 2: 20, 3: 30 }
649 /// # process
650 /// # .source_iter(q!(vec![(1, 10), (2, 20), (3, 30)]))
651 /// # .into_keyed()
652 /// # .batch(&tick, nondet!(/** test */))
653 /// # .first();
654 /// let other = // { 1: 100, 2: 200, 4: 400 }
655 /// # process
656 /// # .source_iter(q!(vec![(1, 100), (2, 200), (4, 400)]))
657 /// # .into_keyed()
658 /// # .batch(&tick, nondet!(/** test */))
659 /// # .first();
660 /// requests.join_keyed_singleton(other)
661 /// # .entries().all_ticks()
662 /// # }, |mut stream| async move {
663 /// // { 1: (10, 100), 2: (20, 200) }
664 /// # let mut results = vec![];
665 /// # for _ in 0..2 {
666 /// # results.push(stream.next().await.unwrap());
667 /// # }
668 /// # results.sort();
669 /// # assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
670 /// # }));
671 /// # }
672 /// ```
673 pub fn join_keyed_singleton<V2: Clone>(
674 self,
675 other: KeyedSingleton<K, V2, L, Bounded>,
676 ) -> KeyedSingleton<K, (V, V2), L, Bounded>
677 where
678 B: IsBounded,
679 K: Eq + Hash,
680 {
681 let result_stream = self
682 .make_bounded()
683 .entries()
684 .join(other.entries())
685 .into_keyed();
686
687 // The cast is guaranteed to succeed, since each key (in both `self` and `other`) has at most one value.
688 KeyedSingleton::new(
689 result_stream.location.clone(),
690 HydroNode::Cast {
691 inner: Box::new(result_stream.ir_node.into_inner()),
692 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
693 K,
694 (V, V2),
695 L,
696 Bounded,
697 >::collection_kind(
698 )),
699 },
700 )
701 }
702
703 /// For each value in `self`, find the matching key in `lookup`.
704 /// The output is a keyed singleton with the key from `self`, and a value
705 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
706 /// If the key is not present in `lookup`, the option will be [`None`].
707 ///
708 /// # Example
709 /// ```rust
710 /// # #[cfg(feature = "deploy")] {
711 /// # use hydro_lang::prelude::*;
712 /// # use futures::StreamExt;
713 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
714 /// # let tick = process.tick();
715 /// let requests = // { 1: 10, 2: 20 }
716 /// # process
717 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
718 /// # .into_keyed()
719 /// # .batch(&tick, nondet!(/** test */))
720 /// # .first();
721 /// let other_data = // { 10: 100, 11: 110 }
722 /// # process
723 /// # .source_iter(q!(vec![(10, 100), (11, 110)]))
724 /// # .into_keyed()
725 /// # .batch(&tick, nondet!(/** test */))
726 /// # .first();
727 /// requests.lookup_keyed_singleton(other_data)
728 /// # .entries().all_ticks()
729 /// # }, |mut stream| async move {
730 /// // { 1: (10, Some(100)), 2: (20, None) }
731 /// # let mut results = vec![];
732 /// # for _ in 0..2 {
733 /// # results.push(stream.next().await.unwrap());
734 /// # }
735 /// # results.sort();
736 /// # assert_eq!(results, vec![(1, (10, Some(100))), (2, (20, None))]);
737 /// # }));
738 /// # }
739 /// ```
740 pub fn lookup_keyed_singleton<V2>(
741 self,
742 lookup: KeyedSingleton<V, V2, L, Bounded>,
743 ) -> KeyedSingleton<K, (V, Option<V2>), L, Bounded>
744 where
745 B: IsBounded,
746 K: Eq + Hash + Clone,
747 V: Eq + Hash + Clone,
748 V2: Clone,
749 {
750 let result_stream = self
751 .make_bounded()
752 .into_keyed_stream()
753 .lookup_keyed_stream(lookup.into_keyed_stream());
754
755 // The cast is guaranteed to succeed since both lookup and self contain at most 1 value per key
756 KeyedSingleton::new(
757 result_stream.location.clone(),
758 HydroNode::Cast {
759 inner: Box::new(result_stream.ir_node.into_inner()),
760 metadata: result_stream.location.new_node_metadata(KeyedSingleton::<
761 K,
762 (V, Option<V2>),
763 L,
764 Bounded,
765 >::collection_kind(
766 )),
767 },
768 )
769 }
770
771 /// For each value in `self`, find the matching key in `lookup`.
772 /// The output is a keyed stream with the key from `self`, and a value
773 /// that is a tuple of (`self`'s value, Option<`lookup`'s value>).
774 /// If the key is not present in `lookup`, the option will be [`None`].
775 ///
776 /// # Example
777 /// ```rust
778 /// # #[cfg(feature = "deploy")] {
779 /// # use hydro_lang::prelude::*;
780 /// # use futures::StreamExt;
781 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
782 /// # let tick = process.tick();
783 /// let requests = // { 1: 10, 2: 20 }
784 /// # process
785 /// # .source_iter(q!(vec![(1, 10), (2, 20)]))
786 /// # .into_keyed()
787 /// # .batch(&tick, nondet!(/** test */))
788 /// # .first();
789 /// let other_data = // { 10: 100, 10: 110 }
790 /// # process
791 /// # .source_iter(q!(vec![(10, 100), (10, 110)]))
792 /// # .into_keyed()
793 /// # .batch(&tick, nondet!(/** test */));
794 /// requests.lookup_keyed_stream(other_data)
795 /// # .entries().all_ticks()
796 /// # }, |mut stream| async move {
797 /// // { 1: [(10, Some(100)), (10, Some(110))], 2: (20, None) }
798 /// # let mut results = vec![];
799 /// # for _ in 0..3 {
800 /// # results.push(stream.next().await.unwrap());
801 /// # }
802 /// # results.sort();
803 /// # assert_eq!(results, vec![(1, (10, Some(100))), (1, (10, Some(110))), (2, (20, None))]);
804 /// # }));
805 /// # }
806 /// ```
807 pub fn lookup_keyed_stream<V2, O: Ordering, R: Retries>(
808 self,
809 lookup: KeyedStream<V, V2, L, Bounded, O, R>,
810 ) -> KeyedStream<K, (V, Option<V2>), L, Bounded, NoOrder, R>
811 where
812 B: IsBounded,
813 K: Eq + Hash + Clone,
814 V: Eq + Hash + Clone,
815 V2: Clone,
816 {
817 self.make_bounded()
818 .entries()
819 .weaken_retries::<R>() // TODO: Once weaken_retries() is implemented for KeyedSingleton, remove entries() and into_keyed()
820 .into_keyed()
821 .lookup_keyed_stream(lookup)
822 }
823}
824
825impl<'a, K, V, L: Location<'a>, B: KeyedSingletonBound<ValueBound = Bounded>>
826 KeyedSingleton<K, V, L, B>
827{
828 /// Flattens the keyed singleton into an unordered stream of key-value pairs.
829 ///
830 /// The value for each key must be bounded, otherwise the resulting stream elements would be
831 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
832 /// into the output.
833 ///
834 /// # Example
835 /// ```rust
836 /// # #[cfg(feature = "deploy")] {
837 /// # use hydro_lang::prelude::*;
838 /// # use futures::StreamExt;
839 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
840 /// let keyed_singleton = // { 1: 2, 2: 4 }
841 /// # process
842 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
843 /// # .into_keyed()
844 /// # .first();
845 /// keyed_singleton.entries()
846 /// # }, |mut stream| async move {
847 /// // (1, 2), (2, 4) in any order
848 /// # let mut results = Vec::new();
849 /// # for _ in 0..2 {
850 /// # results.push(stream.next().await.unwrap());
851 /// # }
852 /// # results.sort();
853 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
854 /// # }));
855 /// # }
856 /// ```
857 pub fn entries(self) -> Stream<(K, V), L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
858 self.into_keyed_stream().entries()
859 }
860
861 /// Flattens the keyed singleton into an unordered stream of just the values.
862 ///
863 /// The value for each key must be bounded, otherwise the resulting stream elements would be
864 /// non-deterministic. As new entries are added to the keyed singleton, they will be streamed
865 /// into the output.
866 ///
867 /// # Example
868 /// ```rust
869 /// # #[cfg(feature = "deploy")] {
870 /// # use hydro_lang::prelude::*;
871 /// # use futures::StreamExt;
872 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
873 /// let keyed_singleton = // { 1: 2, 2: 4 }
874 /// # process
875 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
876 /// # .into_keyed()
877 /// # .first();
878 /// keyed_singleton.values()
879 /// # }, |mut stream| async move {
880 /// // 2, 4 in any order
881 /// # let mut results = Vec::new();
882 /// # for _ in 0..2 {
883 /// # results.push(stream.next().await.unwrap());
884 /// # }
885 /// # results.sort();
886 /// # assert_eq!(results, vec![2, 4]);
887 /// # }));
888 /// # }
889 /// ```
890 pub fn values(self) -> Stream<V, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
891 let map_f = q!(|(_, v)| v)
892 .splice_fn1_ctx::<(K, V), V>(&self.location)
893 .into();
894
895 Stream::new(
896 self.location.clone(),
897 HydroNode::Map {
898 f: map_f,
899 input: Box::new(self.ir_node.into_inner()),
900 metadata: self.location.new_node_metadata(Stream::<
901 V,
902 L,
903 B::UnderlyingBound,
904 NoOrder,
905 ExactlyOnce,
906 >::collection_kind()),
907 },
908 )
909 }
910
911 /// Flattens the keyed singleton into an unordered stream of just the keys.
912 ///
913 /// The value for each key must be bounded, otherwise the removal of keys would result in
914 /// non-determinism. As new entries are added to the keyed singleton, they will be streamed
915 /// into the output.
916 ///
917 /// # Example
918 /// ```rust
919 /// # #[cfg(feature = "deploy")] {
920 /// # use hydro_lang::prelude::*;
921 /// # use futures::StreamExt;
922 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
923 /// let keyed_singleton = // { 1: 2, 2: 4 }
924 /// # process
925 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
926 /// # .into_keyed()
927 /// # .first();
928 /// keyed_singleton.keys()
929 /// # }, |mut stream| async move {
930 /// // 1, 2 in any order
931 /// # let mut results = Vec::new();
932 /// # for _ in 0..2 {
933 /// # results.push(stream.next().await.unwrap());
934 /// # }
935 /// # results.sort();
936 /// # assert_eq!(results, vec![1, 2]);
937 /// # }));
938 /// # }
939 /// ```
940 pub fn keys(self) -> Stream<K, L, B::UnderlyingBound, NoOrder, ExactlyOnce> {
941 self.entries().map(q!(|(k, _)| k))
942 }
943
944 /// Given a bounded stream of keys `K`, returns a new keyed singleton containing only the
945 /// entries whose keys are not in the provided stream.
946 ///
947 /// # Example
948 /// ```rust
949 /// # #[cfg(feature = "deploy")] {
950 /// # use hydro_lang::prelude::*;
951 /// # use futures::StreamExt;
952 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
953 /// let tick = process.tick();
954 /// let keyed_singleton = // { 1: 2, 2: 4 }
955 /// # process
956 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
957 /// # .into_keyed()
958 /// # .first()
959 /// # .batch(&tick, nondet!(/** test */));
960 /// let keys_to_remove = process
961 /// .source_iter(q!(vec![1]))
962 /// .batch(&tick, nondet!(/** test */));
963 /// keyed_singleton.filter_key_not_in(keys_to_remove)
964 /// # .entries().all_ticks()
965 /// # }, |mut stream| async move {
966 /// // { 2: 4 }
967 /// # for w in vec![(2, 4)] {
968 /// # assert_eq!(stream.next().await.unwrap(), w);
969 /// # }
970 /// # }));
971 /// # }
972 /// ```
973 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
974 self,
975 other: Stream<K, L, Bounded, O2, R2>,
976 ) -> Self
977 where
978 K: Hash + Eq,
979 {
980 check_matching_location(&self.location, &other.location);
981
982 KeyedSingleton::new(
983 self.location.clone(),
984 HydroNode::AntiJoin {
985 pos: Box::new(self.ir_node.into_inner()),
986 neg: Box::new(other.ir_node.into_inner()),
987 metadata: self.location.new_node_metadata(Self::collection_kind()),
988 },
989 )
990 }
991
992 /// An operator which allows you to "inspect" each value of a keyed singleton without
993 /// modifying it. The closure `f` is called on a reference to each value. This is
994 /// mainly useful for debugging, and should not be used to generate side-effects.
995 ///
996 /// # Example
997 /// ```rust
998 /// # #[cfg(feature = "deploy")] {
999 /// # use hydro_lang::prelude::*;
1000 /// # use futures::StreamExt;
1001 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1002 /// let keyed_singleton = // { 1: 2, 2: 4 }
1003 /// # process
1004 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1005 /// # .into_keyed()
1006 /// # .first();
1007 /// keyed_singleton
1008 /// .inspect(q!(|v| println!("{}", v)))
1009 /// # .entries()
1010 /// # }, |mut stream| async move {
1011 /// // { 1: 2, 2: 4 }
1012 /// # for w in vec![(1, 2), (2, 4)] {
1013 /// # assert_eq!(stream.next().await.unwrap(), w);
1014 /// # }
1015 /// # }));
1016 /// # }
1017 /// ```
1018 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> Self
1019 where
1020 F: Fn(&V) + 'a,
1021 {
1022 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1023 let inspect_f = q!({
1024 let orig = f;
1025 move |t: &(_, _)| orig(&t.1)
1026 })
1027 .splice_fn1_borrow_ctx::<(K, V), ()>(&self.location)
1028 .into();
1029
1030 KeyedSingleton::new(
1031 self.location.clone(),
1032 HydroNode::Inspect {
1033 f: inspect_f,
1034 input: Box::new(self.ir_node.into_inner()),
1035 metadata: self.location.new_node_metadata(Self::collection_kind()),
1036 },
1037 )
1038 }
1039
1040 /// An operator which allows you to "inspect" each entry of a keyed singleton without
1041 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
1042 /// mainly useful for debugging, and should not be used to generate side-effects.
1043 ///
1044 /// # Example
1045 /// ```rust
1046 /// # #[cfg(feature = "deploy")] {
1047 /// # use hydro_lang::prelude::*;
1048 /// # use futures::StreamExt;
1049 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1050 /// let keyed_singleton = // { 1: 2, 2: 4 }
1051 /// # process
1052 /// # .source_iter(q!(vec![(1, 2), (2, 4)]))
1053 /// # .into_keyed()
1054 /// # .first();
1055 /// keyed_singleton
1056 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
1057 /// # .entries()
1058 /// # }, |mut stream| async move {
1059 /// // { 1: 2, 2: 4 }
1060 /// # for w in vec![(1, 2), (2, 4)] {
1061 /// # assert_eq!(stream.next().await.unwrap(), w);
1062 /// # }
1063 /// # }));
1064 /// # }
1065 /// ```
1066 pub fn inspect_with_key<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Self
1067 where
1068 F: Fn(&(K, V)) + 'a,
1069 {
1070 let inspect_f = f.splice_fn1_borrow_ctx::<(K, V), ()>(&self.location).into();
1071
1072 KeyedSingleton::new(
1073 self.location.clone(),
1074 HydroNode::Inspect {
1075 f: inspect_f,
1076 input: Box::new(self.ir_node.into_inner()),
1077 metadata: self.location.new_node_metadata(Self::collection_kind()),
1078 },
1079 )
1080 }
1081
1082 /// Gets the key-value tuple with the largest key among all entries in this [`KeyedSingleton`].
1083 ///
1084 /// Because this method requires values to be bounded, the output [`Optional`] will only be
1085 /// asynchronously updated if a new key is added that is higher than the previous max key.
1086 ///
1087 /// # Example
1088 /// ```rust
1089 /// # #[cfg(feature = "deploy")] {
1090 /// # use hydro_lang::prelude::*;
1091 /// # use futures::StreamExt;
1092 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1093 /// let tick = process.tick();
1094 /// let keyed_singleton = // { 1: 123, 2: 456, 0: 789 }
1095 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 123), (2, 456), (0, 789)])))
1096 /// # .into_keyed()
1097 /// # .first();
1098 /// keyed_singleton.get_max_key()
1099 /// # .sample_eager(nondet!(/** test */))
1100 /// # }, |mut stream| async move {
1101 /// // (2, 456)
1102 /// # assert_eq!(stream.next().await.unwrap(), (2, 456));
1103 /// # }));
1104 /// # }
1105 /// ```
1106 pub fn get_max_key(self) -> Optional<(K, V), L, B::UnderlyingBound>
1107 where
1108 K: Ord,
1109 {
1110 self.entries()
1111 .assume_ordering_trusted(nondet!(
1112 /// There is only one element associated with each key, and the keys are totallly
1113 /// ordered so we will produce a deterministic value. The closure technically
1114 /// isn't commutative in the case where both passed entries have the same key
1115 /// but different values.
1116 ///
1117 /// In the future, we may want to have an `assume!(...)` statement in the UDF that
1118 /// the two inputs do not have the same key.
1119 ))
1120 .reduce(q!(
1121 move |curr, new| {
1122 if new.0 > curr.0 {
1123 *curr = new;
1124 }
1125 },
1126 idempotent = manual_proof!(/** repeated elements are ignored */)
1127 ))
1128 }
1129
1130 /// Converts this keyed singleton into a [`KeyedStream`] with each group having a single
1131 /// element, the value.
1132 ///
1133 /// This is the equivalent of [`Singleton::into_stream`] but keyed.
1134 ///
1135 /// # Example
1136 /// ```rust
1137 /// # #[cfg(feature = "deploy")] {
1138 /// # use hydro_lang::prelude::*;
1139 /// # use futures::StreamExt;
1140 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1141 /// let keyed_singleton = // { 1: 2, 2: 4 }
1142 /// # Stream::<_, _>::from(process.source_iter(q!(vec![(1, 2), (2, 4)])))
1143 /// # .into_keyed()
1144 /// # .first();
1145 /// keyed_singleton
1146 /// .clone()
1147 /// .into_keyed_stream()
1148 /// .interleave(
1149 /// keyed_singleton.into_keyed_stream()
1150 /// )
1151 /// # .entries()
1152 /// # }, |mut stream| async move {
1153 /// /// // { 1: [2, 2], 2: [4, 4] }
1154 /// # for w in vec![(1, 2), (2, 4), (1, 2), (2, 4)] {
1155 /// # assert_eq!(stream.next().await.unwrap(), w);
1156 /// # }
1157 /// # }));
1158 /// # }
1159 /// ```
1160 pub fn into_keyed_stream(
1161 self,
1162 ) -> KeyedStream<K, V, L, B::UnderlyingBound, TotalOrder, ExactlyOnce> {
1163 KeyedStream::new(
1164 self.location.clone(),
1165 HydroNode::Cast {
1166 inner: Box::new(self.ir_node.into_inner()),
1167 metadata: self.location.new_node_metadata(KeyedStream::<
1168 K,
1169 V,
1170 L,
1171 B::UnderlyingBound,
1172 TotalOrder,
1173 ExactlyOnce,
1174 >::collection_kind()),
1175 },
1176 )
1177 }
1178}
1179
1180impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, L, B>
1181where
1182 L: Location<'a>,
1183{
1184 /// Shifts this keyed singleton into an atomic context, which guarantees that any downstream logic
1185 /// will all be executed synchronously before any outputs are yielded (in [`KeyedSingleton::end_atomic`]).
1186 ///
1187 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1188 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1189 /// argument that declares where the keyed singleton will be atomically processed. Batching a
1190 /// keyed singleton into the _same_ [`Tick`] will preserve the synchronous execution, while
1191 /// batching into a different [`Tick`] will introduce asynchrony.
1192 pub fn atomic(self, tick: &Tick<L>) -> KeyedSingleton<K, V, Atomic<L>, B> {
1193 let out_location = Atomic { tick: tick.clone() };
1194 KeyedSingleton::new(
1195 out_location.clone(),
1196 HydroNode::BeginAtomic {
1197 inner: Box::new(self.ir_node.into_inner()),
1198 metadata: out_location
1199 .new_node_metadata(KeyedSingleton::<K, V, Atomic<L>, B>::collection_kind()),
1200 },
1201 )
1202 }
1203}
1204
1205impl<'a, K, V, L, B: KeyedSingletonBound> KeyedSingleton<K, V, Atomic<L>, B>
1206where
1207 L: Location<'a> + NoTick,
1208{
1209 /// Yields the elements of this keyed singleton back into a top-level, asynchronous execution context.
1210 /// See [`KeyedSingleton::atomic`] for more details.
1211 pub fn end_atomic(self) -> KeyedSingleton<K, V, L, B> {
1212 KeyedSingleton::new(
1213 self.location.tick.l.clone(),
1214 HydroNode::EndAtomic {
1215 inner: Box::new(self.ir_node.into_inner()),
1216 metadata: self
1217 .location
1218 .tick
1219 .l
1220 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1221 },
1222 )
1223 }
1224}
1225
1226impl<'a, K, V, L: Location<'a>> KeyedSingleton<K, V, Tick<L>, Bounded> {
1227 /// Shifts the state in `self` to the **next tick**, so that the returned keyed singleton at
1228 /// tick `T` always has the entries of `self` at tick `T - 1`.
1229 ///
1230 /// At tick `0`, the output has no entries, since there is no previous tick.
1231 ///
1232 /// This operator enables stateful iterative processing with ticks, by sending data from one
1233 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1234 ///
1235 /// # Example
1236 /// ```rust
1237 /// # #[cfg(feature = "deploy")] {
1238 /// # use hydro_lang::prelude::*;
1239 /// # use futures::StreamExt;
1240 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1241 /// let tick = process.tick();
1242 /// # // ticks are lazy by default, forces the second tick to run
1243 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1244 /// # let batch_first_tick = process
1245 /// # .source_iter(q!(vec![(1, 2), (2, 3)]))
1246 /// # .batch(&tick, nondet!(/** test */))
1247 /// # .into_keyed();
1248 /// # let batch_second_tick = process
1249 /// # .source_iter(q!(vec![(2, 4), (3, 5)]))
1250 /// # .batch(&tick, nondet!(/** test */))
1251 /// # .into_keyed()
1252 /// # .defer_tick(); // appears on the second tick
1253 /// let input_batch = // first tick: { 1: 2, 2: 3 }, second tick: { 2: 4, 3: 5 }
1254 /// # batch_first_tick.chain(batch_second_tick).first();
1255 /// input_batch.clone().filter_key_not_in(
1256 /// input_batch.defer_tick().keys() // keys present in the previous tick
1257 /// )
1258 /// # .entries().all_ticks()
1259 /// # }, |mut stream| async move {
1260 /// // { 1: 2, 2: 3 } (first tick), { 3: 5 } (second tick)
1261 /// # for w in vec![(1, 2), (2, 3), (3, 5)] {
1262 /// # assert_eq!(stream.next().await.unwrap(), w);
1263 /// # }
1264 /// # }));
1265 /// # }
1266 /// ```
1267 pub fn defer_tick(self) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1268 KeyedSingleton::new(
1269 self.location.clone(),
1270 HydroNode::DeferTick {
1271 input: Box::new(self.ir_node.into_inner()),
1272 metadata: self
1273 .location
1274 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1275 },
1276 )
1277 }
1278}
1279
1280impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, L, B>
1281where
1282 L: Location<'a>,
1283{
1284 /// Returns a keyed singleton with a snapshot of each key-value entry at a non-deterministic
1285 /// point in time.
1286 ///
1287 /// # Non-Determinism
1288 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1289 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1290 pub fn snapshot(
1291 self,
1292 tick: &Tick<L>,
1293 _nondet: NonDet,
1294 ) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1295 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1296 KeyedSingleton::new(
1297 tick.clone(),
1298 HydroNode::Batch {
1299 inner: Box::new(self.ir_node.into_inner()),
1300 metadata: tick
1301 .new_node_metadata(KeyedSingleton::<K, V, Tick<L>, Bounded>::collection_kind()),
1302 },
1303 )
1304 }
1305}
1306
1307impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Unbounded>> KeyedSingleton<K, V, Atomic<L>, B>
1308where
1309 L: Location<'a> + NoTick,
1310{
1311 /// Returns a keyed singleton with a snapshot of each key-value entry, consistent with the
1312 /// state of the keyed singleton being atomically processed.
1313 ///
1314 /// # Non-Determinism
1315 /// Because this picks a snapshot of each entry, which is continuously changing, each output has a
1316 /// non-deterministic set of entries since each snapshot can be at an arbitrary point in time.
1317 pub fn snapshot_atomic(self, _nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1318 KeyedSingleton::new(
1319 self.location.clone().tick,
1320 HydroNode::Batch {
1321 inner: Box::new(self.ir_node.into_inner()),
1322 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1323 K,
1324 V,
1325 Tick<L>,
1326 Bounded,
1327 >::collection_kind(
1328 )),
1329 },
1330 )
1331 }
1332}
1333
1334impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, L, B>
1335where
1336 L: Location<'a>,
1337{
1338 /// Creates a keyed singleton containing only the key-value pairs where the value satisfies a predicate `f`.
1339 ///
1340 /// The closure `f` receives a reference `&V` to each value and returns a boolean. If the predicate
1341 /// returns `true`, the key-value pair is included in the output. If it returns `false`, the pair
1342 /// is filtered out.
1343 ///
1344 /// The closure `f` receives a reference `&V` rather than an owned value `V` because filtering does
1345 /// not modify or take ownership of the values. If you need to modify the values while filtering
1346 /// use [`KeyedSingleton::filter_map`] instead.
1347 ///
1348 /// # Example
1349 /// ```rust
1350 /// # #[cfg(feature = "deploy")] {
1351 /// # use hydro_lang::prelude::*;
1352 /// # use futures::StreamExt;
1353 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1354 /// let keyed_singleton = // { 1: 2, 2: 4, 3: 1 }
1355 /// # process
1356 /// # .source_iter(q!(vec![(1, 2), (2, 4), (3, 1)]))
1357 /// # .into_keyed()
1358 /// # .first();
1359 /// keyed_singleton.filter(q!(|&v| v > 1))
1360 /// # .entries()
1361 /// # }, |mut stream| async move {
1362 /// // { 1: 2, 2: 4 }
1363 /// # let mut results = Vec::new();
1364 /// # for _ in 0..2 {
1365 /// # results.push(stream.next().await.unwrap());
1366 /// # }
1367 /// # results.sort();
1368 /// # assert_eq!(results, vec![(1, 2), (2, 4)]);
1369 /// # }));
1370 /// # }
1371 /// ```
1372 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedSingleton<K, V, L, B>
1373 where
1374 F: Fn(&V) -> bool + 'a,
1375 {
1376 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
1377 let filter_f = q!({
1378 let orig = f;
1379 move |t: &(_, _)| orig(&t.1)
1380 })
1381 .splice_fn1_borrow_ctx::<(K, V), bool>(&self.location)
1382 .into();
1383
1384 KeyedSingleton::new(
1385 self.location.clone(),
1386 HydroNode::Filter {
1387 f: filter_f,
1388 input: Box::new(self.ir_node.into_inner()),
1389 metadata: self
1390 .location
1391 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1392 },
1393 )
1394 }
1395
1396 /// An operator that both filters and maps values. It yields only the key-value pairs where
1397 /// the supplied closure `f` returns `Some(value)`.
1398 ///
1399 /// The closure `f` receives each value `V` and returns `Option<U>`. If the closure returns
1400 /// `Some(new_value)`, the key-value pair `(key, new_value)` is included in the output.
1401 /// If it returns `None`, the key-value pair is filtered out.
1402 ///
1403 /// # Example
1404 /// ```rust
1405 /// # #[cfg(feature = "deploy")] {
1406 /// # use hydro_lang::prelude::*;
1407 /// # use futures::StreamExt;
1408 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1409 /// let keyed_singleton = // { 1: "42", 2: "hello", 3: "100" }
1410 /// # process
1411 /// # .source_iter(q!(vec![(1, "42"), (2, "hello"), (3, "100")]))
1412 /// # .into_keyed()
1413 /// # .first();
1414 /// keyed_singleton.filter_map(q!(|s| s.parse::<i32>().ok()))
1415 /// # .entries()
1416 /// # }, |mut stream| async move {
1417 /// // { 1: 42, 3: 100 }
1418 /// # let mut results = Vec::new();
1419 /// # for _ in 0..2 {
1420 /// # results.push(stream.next().await.unwrap());
1421 /// # }
1422 /// # results.sort();
1423 /// # assert_eq!(results, vec![(1, 42), (3, 100)]);
1424 /// # }));
1425 /// # }
1426 /// ```
1427 pub fn filter_map<F, U>(
1428 self,
1429 f: impl IntoQuotedMut<'a, F, L> + Copy,
1430 ) -> KeyedSingleton<K, U, L, B>
1431 where
1432 F: Fn(V) -> Option<U> + 'a,
1433 {
1434 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
1435 let filter_map_f = q!({
1436 let orig = f;
1437 move |(k, v)| orig(v).map(|o| (k, o))
1438 })
1439 .splice_fn1_ctx::<(K, V), Option<(K, U)>>(&self.location)
1440 .into();
1441
1442 KeyedSingleton::new(
1443 self.location.clone(),
1444 HydroNode::FilterMap {
1445 f: filter_map_f,
1446 input: Box::new(self.ir_node.into_inner()),
1447 metadata: self
1448 .location
1449 .new_node_metadata(KeyedSingleton::<K, U, L, B>::collection_kind()),
1450 },
1451 )
1452 }
1453
1454 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that have
1455 /// arrived since the previous batch was released.
1456 ///
1457 /// Currently, there is no `all_ticks` dual on [`KeyedSingleton`], instead you may want to use
1458 /// [`KeyedSingleton::into_keyed_stream`] then yield with [`KeyedStream::all_ticks`].
1459 ///
1460 /// # Non-Determinism
1461 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1462 /// has a non-deterministic set of key-value pairs.
1463 pub fn batch(self, tick: &Tick<L>, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded>
1464 where
1465 L: NoTick,
1466 {
1467 self.atomic(tick).batch_atomic(nondet)
1468 }
1469}
1470
1471impl<'a, K, V, L, B: KeyedSingletonBound<ValueBound = Bounded>> KeyedSingleton<K, V, Atomic<L>, B>
1472where
1473 L: Location<'a> + NoTick,
1474{
1475 /// Returns a keyed singleton with entries consisting of _new_ key-value pairs that are being
1476 /// atomically processed.
1477 ///
1478 /// Currently, there is no dual to asynchronously yield back outside the tick, instead you
1479 /// should use [`KeyedSingleton::into_keyed_stream`] and yield a [`KeyedStream`].
1480 ///
1481 /// # Non-Determinism
1482 /// Because this picks a batch of asynchronously added entries, each output keyed singleton
1483 /// has a non-deterministic set of key-value pairs.
1484 pub fn batch_atomic(self, nondet: NonDet) -> KeyedSingleton<K, V, Tick<L>, Bounded> {
1485 let _ = nondet;
1486 KeyedSingleton::new(
1487 self.location.clone().tick,
1488 HydroNode::Batch {
1489 inner: Box::new(self.ir_node.into_inner()),
1490 metadata: self.location.tick.new_node_metadata(KeyedSingleton::<
1491 K,
1492 V,
1493 Tick<L>,
1494 Bounded,
1495 >::collection_kind(
1496 )),
1497 },
1498 )
1499 }
1500}
1501
1502#[cfg(test)]
1503mod tests {
1504 #[cfg(feature = "deploy")]
1505 use futures::{SinkExt, StreamExt};
1506 #[cfg(feature = "deploy")]
1507 use hydro_deploy::Deployment;
1508 #[cfg(any(feature = "deploy", feature = "sim"))]
1509 use stageleft::q;
1510
1511 #[cfg(any(feature = "deploy", feature = "sim"))]
1512 use crate::compile::builder::FlowBuilder;
1513 #[cfg(any(feature = "deploy", feature = "sim"))]
1514 use crate::location::Location;
1515 #[cfg(any(feature = "deploy", feature = "sim"))]
1516 use crate::nondet::nondet;
1517
1518 #[cfg(feature = "deploy")]
1519 #[tokio::test]
1520 async fn key_count_bounded_value() {
1521 let mut deployment = Deployment::new();
1522
1523 let mut flow = FlowBuilder::new();
1524 let node = flow.process::<()>();
1525 let external = flow.external::<()>();
1526
1527 let (input_port, input) = node.source_external_bincode(&external);
1528 let out = input
1529 .into_keyed()
1530 .first()
1531 .key_count()
1532 .sample_eager(nondet!(/** test */))
1533 .send_bincode_external(&external);
1534
1535 let nodes = flow
1536 .with_process(&node, deployment.Localhost())
1537 .with_external(&external, deployment.Localhost())
1538 .deploy(&mut deployment);
1539
1540 deployment.deploy().await.unwrap();
1541
1542 let mut external_in = nodes.connect(input_port).await;
1543 let mut external_out = nodes.connect(out).await;
1544
1545 deployment.start().await.unwrap();
1546
1547 assert_eq!(external_out.next().await.unwrap(), 0);
1548
1549 external_in.send((1, 1)).await.unwrap();
1550 assert_eq!(external_out.next().await.unwrap(), 1);
1551
1552 external_in.send((2, 2)).await.unwrap();
1553 assert_eq!(external_out.next().await.unwrap(), 2);
1554 }
1555
1556 #[cfg(feature = "deploy")]
1557 #[tokio::test]
1558 async fn key_count_unbounded_value() {
1559 let mut deployment = Deployment::new();
1560
1561 let mut flow = FlowBuilder::new();
1562 let node = flow.process::<()>();
1563 let external = flow.external::<()>();
1564
1565 let (input_port, input) = node.source_external_bincode(&external);
1566 let out = input
1567 .into_keyed()
1568 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1569 .key_count()
1570 .sample_eager(nondet!(/** test */))
1571 .send_bincode_external(&external);
1572
1573 let nodes = flow
1574 .with_process(&node, deployment.Localhost())
1575 .with_external(&external, deployment.Localhost())
1576 .deploy(&mut deployment);
1577
1578 deployment.deploy().await.unwrap();
1579
1580 let mut external_in = nodes.connect(input_port).await;
1581 let mut external_out = nodes.connect(out).await;
1582
1583 deployment.start().await.unwrap();
1584
1585 assert_eq!(external_out.next().await.unwrap(), 0);
1586
1587 external_in.send((1, 1)).await.unwrap();
1588 assert_eq!(external_out.next().await.unwrap(), 1);
1589
1590 external_in.send((1, 2)).await.unwrap();
1591 assert_eq!(external_out.next().await.unwrap(), 1);
1592
1593 external_in.send((2, 2)).await.unwrap();
1594 assert_eq!(external_out.next().await.unwrap(), 2);
1595
1596 external_in.send((1, 1)).await.unwrap();
1597 assert_eq!(external_out.next().await.unwrap(), 2);
1598
1599 external_in.send((3, 1)).await.unwrap();
1600 assert_eq!(external_out.next().await.unwrap(), 3);
1601 }
1602
1603 #[cfg(feature = "deploy")]
1604 #[tokio::test]
1605 async fn into_singleton_bounded_value() {
1606 let mut deployment = Deployment::new();
1607
1608 let mut flow = FlowBuilder::new();
1609 let node = flow.process::<()>();
1610 let external = flow.external::<()>();
1611
1612 let (input_port, input) = node.source_external_bincode(&external);
1613 let out = input
1614 .into_keyed()
1615 .first()
1616 .into_singleton()
1617 .sample_eager(nondet!(/** test */))
1618 .send_bincode_external(&external);
1619
1620 let nodes = flow
1621 .with_process(&node, deployment.Localhost())
1622 .with_external(&external, deployment.Localhost())
1623 .deploy(&mut deployment);
1624
1625 deployment.deploy().await.unwrap();
1626
1627 let mut external_in = nodes.connect(input_port).await;
1628 let mut external_out = nodes.connect(out).await;
1629
1630 deployment.start().await.unwrap();
1631
1632 assert_eq!(
1633 external_out.next().await.unwrap(),
1634 std::collections::HashMap::new()
1635 );
1636
1637 external_in.send((1, 1)).await.unwrap();
1638 assert_eq!(
1639 external_out.next().await.unwrap(),
1640 vec![(1, 1)].into_iter().collect()
1641 );
1642
1643 external_in.send((2, 2)).await.unwrap();
1644 assert_eq!(
1645 external_out.next().await.unwrap(),
1646 vec![(1, 1), (2, 2)].into_iter().collect()
1647 );
1648 }
1649
1650 #[cfg(feature = "deploy")]
1651 #[tokio::test]
1652 async fn into_singleton_unbounded_value() {
1653 let mut deployment = Deployment::new();
1654
1655 let mut flow = FlowBuilder::new();
1656 let node = flow.process::<()>();
1657 let external = flow.external::<()>();
1658
1659 let (input_port, input) = node.source_external_bincode(&external);
1660 let out = input
1661 .into_keyed()
1662 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1663 .into_singleton()
1664 .sample_eager(nondet!(/** test */))
1665 .send_bincode_external(&external);
1666
1667 let nodes = flow
1668 .with_process(&node, deployment.Localhost())
1669 .with_external(&external, deployment.Localhost())
1670 .deploy(&mut deployment);
1671
1672 deployment.deploy().await.unwrap();
1673
1674 let mut external_in = nodes.connect(input_port).await;
1675 let mut external_out = nodes.connect(out).await;
1676
1677 deployment.start().await.unwrap();
1678
1679 assert_eq!(
1680 external_out.next().await.unwrap(),
1681 std::collections::HashMap::new()
1682 );
1683
1684 external_in.send((1, 1)).await.unwrap();
1685 assert_eq!(
1686 external_out.next().await.unwrap(),
1687 vec![(1, 1)].into_iter().collect()
1688 );
1689
1690 external_in.send((1, 2)).await.unwrap();
1691 assert_eq!(
1692 external_out.next().await.unwrap(),
1693 vec![(1, 2)].into_iter().collect()
1694 );
1695
1696 external_in.send((2, 2)).await.unwrap();
1697 assert_eq!(
1698 external_out.next().await.unwrap(),
1699 vec![(1, 2), (2, 1)].into_iter().collect()
1700 );
1701
1702 external_in.send((1, 1)).await.unwrap();
1703 assert_eq!(
1704 external_out.next().await.unwrap(),
1705 vec![(1, 3), (2, 1)].into_iter().collect()
1706 );
1707
1708 external_in.send((3, 1)).await.unwrap();
1709 assert_eq!(
1710 external_out.next().await.unwrap(),
1711 vec![(1, 3), (2, 1), (3, 1)].into_iter().collect()
1712 );
1713 }
1714
1715 #[cfg(feature = "sim")]
1716 #[test]
1717 fn sim_unbounded_singleton_snapshot() {
1718 let mut flow = FlowBuilder::new();
1719 let node = flow.process::<()>();
1720
1721 let (input_port, input) = node.sim_input();
1722 let output = input
1723 .into_keyed()
1724 .fold(q!(|| 0), q!(|acc, _| *acc += 1))
1725 .snapshot(&node.tick(), nondet!(/** test */))
1726 .entries()
1727 .all_ticks()
1728 .sim_output();
1729
1730 let count = flow.sim().exhaustive(async || {
1731 input_port.send((1, 123));
1732 input_port.send((1, 456));
1733 input_port.send((2, 123));
1734
1735 let all = output.collect_sorted::<Vec<_>>().await;
1736 assert_eq!(all.last().unwrap(), &(2, 1));
1737 });
1738
1739 assert_eq!(count, 8);
1740 }
1741
1742 #[cfg(feature = "deploy")]
1743 #[tokio::test]
1744 async fn join_keyed_stream() {
1745 let mut deployment = Deployment::new();
1746
1747 let mut flow = FlowBuilder::new();
1748 let node = flow.process::<()>();
1749 let external = flow.external::<()>();
1750
1751 let tick = node.tick();
1752 let keyed_data = node
1753 .source_iter(q!(vec![(1, 10), (2, 20)]))
1754 .into_keyed()
1755 .batch(&tick, nondet!(/** test */))
1756 .first();
1757 let requests = node
1758 .source_iter(q!(vec![(1, 100), (2, 200), (3, 300)]))
1759 .into_keyed()
1760 .batch(&tick, nondet!(/** test */));
1761
1762 let out = keyed_data
1763 .join_keyed_stream(requests)
1764 .entries()
1765 .all_ticks()
1766 .send_bincode_external(&external);
1767
1768 let nodes = flow
1769 .with_process(&node, deployment.Localhost())
1770 .with_external(&external, deployment.Localhost())
1771 .deploy(&mut deployment);
1772
1773 deployment.deploy().await.unwrap();
1774
1775 let mut external_out = nodes.connect(out).await;
1776
1777 deployment.start().await.unwrap();
1778
1779 let mut results = vec![];
1780 for _ in 0..2 {
1781 results.push(external_out.next().await.unwrap());
1782 }
1783 results.sort();
1784
1785 assert_eq!(results, vec![(1, (10, 100)), (2, (20, 200))]);
1786 }
1787}