semigroup/
concurrent.rs

1use std::future::Future;
2
3use futures::{Stream, StreamExt, TryStream, TryStreamExt};
4
5use crate::{Commutative, Semigroup};
6
7/// EXPERIMENTAL: Async version of [`Semigroup`].
8pub trait AsyncSemigroup: Sized {
9    fn async_op_assign(base: &mut Self, other: Self) -> impl Future<Output = ()>;
10    fn async_op(mut base: Self, other: Self) -> impl Future<Output = Self> {
11        async {
12            Self::async_op_assign(&mut base, other).await;
13            base
14        }
15    }
16    fn async_semigroup_assign(&mut self, other: Self) -> impl Future<Output = ()> {
17        Self::async_op_assign(self, other)
18    }
19    fn async_semigroup(self, other: Self) -> impl Future<Output = Self> {
20        Self::async_op(self, other)
21    }
22}
23impl<T: Semigroup> AsyncSemigroup for T {
24    async fn async_op_assign(base: &mut Self, other: Self) {
25        Semigroup::op_assign(base, other)
26    }
27}
28
29/// Async version of [`Commutative`].
30pub trait AsyncCommutative: AsyncSemigroup + Commutative {
31    /// Used by [`CombineStream::fold_semigroup`].
32    fn fold_stream(init: Self, stream: impl Stream<Item = Self>) -> impl Future<Output = Self> {
33        async { stream.fold(init, Self::async_op).await }
34    }
35    /// Used by [`CombineStream::reduce_semigroup`].
36    fn reduce_stream(
37        mut stream: impl Stream<Item = Self> + Unpin,
38    ) -> impl Future<Output = Option<Self>> {
39        async {
40            let init = stream.next().await?;
41            Some(Self::fold_stream(init, stream).await)
42        }
43    }
44    /// Used by [`CombineStream::combine_monoid`].
45    #[cfg(feature = "monoid")]
46    fn combine_stream(stream: impl Stream<Item = Self>) -> impl Future<Output = Self>
47    where
48        Self: crate::Monoid,
49    {
50        async { Self::fold_stream(Self::identity(), stream).await }
51    }
52}
53impl<T: Commutative> AsyncCommutative for T {}
54
55/// Extensions for [`Stream`]s that items implement [`AsyncCommutative`]. Like [`crate::CombineIterator`].
56pub trait CombineStream: Sized + Stream {
57    /// This method like [`crate::CombineIterator::fold_final`], but stream.
58    ///
59    /// # Examples
60    /// ```
61    /// # futures::executor::block_on(async {
62    /// use futures::StreamExt;
63    /// use semigroup::{op::Sum, CombineStream, Semigroup};
64    /// let s1 = futures::stream::iter(0..10);
65    /// let sum = s1.map(Sum).fold_semigroup(Sum(0));
66    /// assert_eq!(sum.await, Sum(45));
67    ///
68    /// let s2 = futures::stream::iter(0..0);
69    /// let empty = s2.map(Sum).fold_semigroup(Sum(0));
70    /// assert_eq!(empty.await, Sum(0))
71    /// # });
72    /// ```
73    ///
74    /// # Type safety
75    /// This method is only available when item implements [`Commutative`].
76    /// ```compile_fail
77    /// # futures::executor::block_on(async {
78    /// use futures::StreamExt;
79    /// use semigroup::{op::Coalesce, CombineStream, Semigroup};
80    /// let stream = futures::stream::iter(0..10);
81    /// let cannot_coalesce = stream.map(Some).map(Coalesce).fold_semigroup(Coalesce(None));
82    /// # });
83    /// ```
84    fn fold_semigroup(self, init: Self::Item) -> impl Future<Output = Self::Item>
85    where
86        Self::Item: AsyncCommutative,
87    {
88        Self::Item::fold_stream(init, self)
89    }
90
91    /// This method like [`crate::CombineIterator::lreduce`], but stream.
92    ///
93    /// # Example
94    /// ```
95    /// # futures::executor::block_on(async {
96    /// use futures::StreamExt;
97    /// use semigroup::{op::Sum, CombineStream, Semigroup};
98    /// let s1 = futures::stream::iter(0..10);
99    /// let sum = s1.map(Sum).reduce_semigroup();
100    /// assert_eq!(sum.await, Some(Sum(45)));
101    ///
102    /// let s2 = futures::stream::iter(0..0);
103    /// let empty = s2.map(Sum).reduce_semigroup();
104    /// assert_eq!(empty.await, None)
105    /// # });
106    /// ```
107    ///
108    /// # Type safety
109    /// This method is only available when item implements [`Commutative`].
110    /// ```compile_fail
111    /// # futures::executor::block_on(async {
112    /// use futures::StreamExt;
113    /// use semigroup::{op::Coalesce, CombineStream, Semigroup};
114    /// let stream = futures::stream::iter(0..10);
115    /// let cannot_coalesce = stream.map(Some).map(Coalesce).reduce_semigroup();
116    /// # });
117    /// ```
118    fn reduce_semigroup(self) -> impl Future<Output = Option<Self::Item>>
119    where
120        Self: Unpin,
121        Self::Item: AsyncCommutative,
122    {
123        Self::Item::reduce_stream(self)
124    }
125
126    /// This method like [`crate::CombineIterator::combine`], but stream.
127    ///
128    /// # Example
129    /// ```
130    /// # futures::executor::block_on(async {
131    /// use futures::StreamExt;
132    /// use semigroup::{op::Sum, CombineStream, Semigroup};
133    /// let s1 = futures::stream::iter(0..10);
134    /// let sum = s1.map(Sum).combine_monoid();
135    /// assert_eq!(sum.await, Sum(45));
136    ///
137    /// let s2 = futures::stream::iter(0..0);
138    /// let empty = s2.map(Sum).combine_monoid();
139    /// assert_eq!(empty.await, Sum(0))
140    /// # });
141    /// ```
142    ///
143    /// # Type safety
144    /// This method is only available when item implements [`Commutative`].
145    /// ```compile_fail
146    /// # futures::executor::block_on(async {
147    /// use futures::StreamExt;
148    /// use semigroup::{op::Coalesce, CombineStream, Semigroup};
149    /// let stream = futures::stream::iter(0..10);
150    /// let cannot_coalesce = stream.map(Some).map(Coalesce).combine_monoid();
151    /// # });
152    /// ```
153    #[cfg(feature = "monoid")]
154    fn combine_monoid(self) -> impl Future<Output = Self::Item>
155    where
156        Self::Item: AsyncCommutative + crate::Monoid,
157    {
158        Self::Item::combine_stream(self)
159    }
160}
161impl<T: Stream> CombineStream for T {}
162
163/// Async try version of [`Commutative`].
164pub trait TryAsyncCommutative: AsyncCommutative {
165    /// Used by [`TryCombineStream::try_fold_semigroup`].
166    fn try_fold_stream<S: TryStream<Ok = Self>>(
167        init: Self,
168        stream: S,
169    ) -> impl Future<Output = Result<Self, S::Error>> {
170        let ok_async_op = |b, o| async { Ok(Self::async_op(b, o).await) };
171        async move { stream.try_fold(init, ok_async_op).await }
172    }
173    /// Used by [`TryCombineStream::try_reduce_semigroup`].
174    fn try_reduce_stream<S: TryStream<Item = Result<Self, E>, Ok = Self, Error = E> + Unpin, E>(
175        mut stream: S,
176    ) -> impl Future<Output = Option<Result<Self, E>>> {
177        async {
178            let init = stream.next().await?;
179            match init {
180                Ok(init) => Some(Self::try_fold_stream(init, stream).await),
181                Err(err) => Some(Err(err)),
182            }
183        }
184    }
185    /// Used by [`TryCombineStream::try_combine_monoid`].
186    #[cfg(feature = "monoid")]
187    fn try_combine_stream<S: TryStream<Ok = Self>>(
188        stream: S,
189    ) -> impl Future<Output = Result<Self, S::Error>>
190    where
191        Self: crate::Monoid,
192    {
193        async { Self::try_fold_stream(Self::identity(), stream).await }
194    }
195}
196impl<T: Commutative> TryAsyncCommutative for T {}
197
198/// Extensions for [`TryStream`]s that items implement [`TryAsyncCommutative`]. Like [`crate::CombineIterator`].
199pub trait TryCombineStream: Sized + TryStream {
200    /// This method like [`crate::CombineIterator::fold_final`], but stream.
201    ///
202    /// # Examples
203    /// ```
204    /// # futures::executor::block_on(async {
205    /// use std::convert::Infallible;
206    /// use futures::StreamExt;
207    /// use semigroup::{op::Sum, TryCombineStream, Semigroup};
208    /// let s1 = futures::stream::iter((0..10).map(Sum).map(Ok::<_, Infallible>));
209    /// let sum = s1.try_fold_semigroup(Sum(0));
210    /// assert_eq!(sum.await, Ok(Sum(45)));
211    ///
212    /// let s2 = futures::stream::iter(vec![Ok(Sum(1)), Err(2), Ok(Sum(3))]);
213    /// let empty = s2.try_fold_semigroup(Sum(0));
214    /// assert_eq!(empty.await, Err(2));
215    ///
216    /// let s3 = futures::stream::iter((0..0).map(Sum).map(Ok::<_, Infallible>));
217    /// let empty = s3.try_fold_semigroup(Sum(0));
218    /// assert_eq!(empty.await, Ok(Sum(0)))
219    /// # });
220    /// ```
221    ///
222    /// # Type safety
223    /// This method is only available when item implements [`Commutative`].
224    /// Same to [`CombineStream::fold_semigroup`].
225    fn try_fold_semigroup(
226        self,
227        init: Self::Ok,
228    ) -> impl Future<Output = Result<Self::Ok, Self::Error>>
229    where
230        Self::Ok: TryAsyncCommutative,
231    {
232        Self::Ok::try_fold_stream(init, self)
233    }
234
235    /// This method like [`crate::CombineIterator::lreduce`], but stream.
236    ///
237    /// # Example
238    /// ```
239    /// # futures::executor::block_on(async {
240    /// use std::convert::Infallible;
241    /// use futures::StreamExt;
242    /// use semigroup::{op::Sum, TryCombineStream, Semigroup};
243    /// let s1 = futures::stream::iter(0..10).map(Sum).map(Ok::<_, Infallible>);
244    /// let sum = s1.try_reduce_semigroup();
245    /// assert_eq!(sum.await, Some(Ok(Sum(45))));
246    ///
247    /// let s2 = futures::stream::iter(vec![Ok(Sum(1)), Err(2), Ok(Sum(3))]);
248    /// let empty = s2.try_reduce_semigroup();
249    /// assert_eq!(empty.await, Some(Err(2)));
250    ///
251    /// let s3 = futures::stream::iter((0..0).map(Sum).map(Ok::<_, Infallible>));
252    /// let empty = s3.try_reduce_semigroup();
253    /// assert_eq!(empty.await, None);
254    /// # });
255    /// ```
256    ///
257    /// # Type safety
258    /// This method is only available when item implements [`Commutative`].
259    /// Same to [`CombineStream::reduce_semigroup`].
260    fn try_reduce_semigroup<T, E>(
261        self,
262    ) -> impl Future<Output = Option<Result<Self::Ok, Self::Error>>>
263    where
264        T: TryAsyncCommutative,
265        Self: Unpin,
266        Self: TryStream<Item = Result<T, E>, Ok = T, Error = E>,
267    {
268        Self::Ok::try_reduce_stream(self)
269    }
270    /// This method like [`crate::CombineIterator::combine`], but stream.
271    ///
272    /// # Example
273    /// ```
274    /// # futures::executor::block_on(async {
275    /// use std::convert::Infallible;
276    /// use futures::StreamExt;
277    /// use semigroup::{op::Sum, TryCombineStream, Semigroup};
278    /// let s1 = futures::stream::iter(0..10).map(Sum).map(Ok::<_, Infallible>);
279    /// let sum = s1.try_combine_monoid();
280    /// assert_eq!(sum.await, Ok(Sum(45)));
281    ///
282    /// let s2 = futures::stream::iter(vec![Ok(Sum(1)), Err(2), Ok(Sum(3))]);
283    /// let empty = s2.try_combine_monoid();
284    /// assert_eq!(empty.await, Err(2));
285    ///
286    /// let s3 = futures::stream::iter((0..0).map(Sum).map(Ok::<_, Infallible>));
287    /// let empty = s3.try_combine_monoid();
288    /// assert_eq!(empty.await, Ok(Sum(0)))
289    /// # });
290    /// ```
291    ///
292    /// # Type safety
293    /// This method is only available when item implements [`Commutative`].
294    /// Same to [`CombineStream::combine_monoid`].
295    #[cfg(feature = "monoid")]
296    fn try_combine_monoid(self) -> impl Future<Output = Result<Self::Ok, Self::Error>>
297    where
298        Self::Ok: TryAsyncCommutative + crate::Monoid,
299    {
300        Self::Ok::try_combine_stream(self)
301    }
302}
303impl<T: TryStream> TryCombineStream for T {}
304
305#[cfg(feature = "test")]
306pub mod test_async_semigroup {
307    use std::fmt::Debug;
308
309    use super::*;
310
311    /// Assert that the given type satisfies the *async semigroup* property.
312    ///
313    /// # Usage
314    /// Same to [`crate::assert_semigroup!`].
315    ///
316    /// # Examples
317    /// ```
318    /// use semigroup::{assert_async_semigroup, op::Sum};
319    ///
320    /// let a = Sum(1);
321    /// let b = Sum(2);
322    /// let c = Sum(3);
323    /// futures::executor::block_on(async {
324    ///     assert_async_semigroup!(a, b, c);
325    /// });
326    ///
327    /// let v = vec![a, b, c];
328    /// futures::executor::block_on(async {
329    ///     assert_async_semigroup!(&v);
330    /// });
331    /// ```
332    ///
333    /// # Panics
334    /// - If the given function does not satisfy the *async semigroup* property.
335    /// ```should_panic
336    /// use semigroup::{assert_async_semigroup, Op, Semigroup};
337    /// #[derive(Debug, Clone, PartialEq, Op)]
338    /// #[op(commutative)]
339    /// pub struct Sub(i32);
340    /// impl Op<i32> for Sub {
341    ///     fn lift_op_assign(base: &mut i32, other: i32) {
342    ///         *base -= other;
343    ///     }
344    /// }
345    /// let a = Sub(1);
346    /// let b = Sub(2);
347    /// let c = Sub(3);
348    /// futures::executor::block_on(async {
349    ///     assert_async_semigroup!(a, b, c);
350    /// });
351    /// ```
352    ///
353    /// - The input iterator has less than 3 items.
354    /// ```compile_fail
355    /// use semigroup::{assert_async_semigroup, op::Sum};
356    /// let a = Sum(1);
357    /// let b = Sum(2);
358    /// futures::executor::block_on(async {
359    ///     assert_async_semigroup!(a, b);
360    /// });
361    /// ```
362    /// ```should_panic
363    /// use semigroup::{assert_async_semigroup, op::Sum};
364    /// let a = Sum(1);
365    /// let b = Sum(2);
366    /// futures::executor::block_on(async {
367    ///     assert_async_semigroup!(&vec![a, b]);
368    /// });
369    /// ```
370    #[macro_export]
371    macro_rules! assert_async_semigroup {
372        ($a:expr, $b: expr, $($tail: expr),*) => {
373            {
374                let v = vec![$a, $b, $($tail),*];
375                $crate::assert_async_semigroup!(&v)
376            }
377        };
378        ($v:expr) => {
379            {
380                let (a, b, c) = $crate::test_semigroup::pick3($v);
381                $crate::test_async_semigroup::assert_async_semigroup_impl(a.clone(), b.clone(), c.clone()).await;
382            }
383        };
384    }
385
386    pub async fn assert_async_semigroup_impl<T: AsyncCommutative + Clone + PartialEq + Debug>(
387        a: T,
388        b: T,
389        c: T,
390    ) {
391        assert_async_associative_law(a.clone(), b.clone(), c.clone()).await;
392    }
393
394    /// Assert that the given type satisfies the *async commutative* property.
395    ///
396    /// # Usage
397    /// Same to [`crate::assert_semigroup!`].
398    ///
399    /// # Examples
400    /// ```
401    /// use semigroup::{assert_async_commutative, op::Sum};
402    ///
403    /// let a = Sum(1);
404    /// let b = Sum(2);
405    /// let c = Sum(3);
406    /// futures::executor::block_on(async {
407    ///     assert_async_commutative!(a, b, c);
408    /// });
409    ///
410    /// let v = vec![a, b, c];
411    /// futures::executor::block_on(async {
412    ///     assert_async_commutative!(&v);
413    /// });
414    /// ```
415    ///
416    /// # Panics
417    /// - If the given function does not satisfy the *async commutative* property.
418    /// ```should_panic
419    /// use semigroup::{assert_async_commutative, Op, Semigroup};
420    /// #[derive(Debug, Clone, PartialEq, Op)]
421    /// #[op(commutative)]
422    /// pub struct Sub(i32);
423    /// impl Op<i32> for Sub {
424    ///     fn lift_op_assign(base: &mut i32, other: i32) {
425    ///         *base -= other;
426    ///     }
427    /// }
428    /// let a = Sub(1);
429    /// let b = Sub(2);
430    /// let c = Sub(3);
431    /// futures::executor::block_on(async {
432    ///     assert_async_commutative!(a, b, c);
433    /// });
434    /// ```
435    ///
436    /// - The input iterator has less than 3 items.
437    /// ```compile_fail
438    /// use semigroup::{assert_async_commutative, op::Sum};
439    /// let a = Sum(1);
440    /// let b = Sum(2);
441    /// futures::executor::block_on(async {
442    ///     assert_async_commutative!(a, b);
443    /// });
444    /// ```
445    /// ```should_panic
446    /// use semigroup::{assert_async_commutative, op::Sum};
447    /// let a = Sum(1);
448    /// let b = Sum(2);
449    /// futures::executor::block_on(async {
450    ///     assert_async_commutative!(&vec![a, b]);
451    /// });
452    /// ```
453    #[macro_export]
454    macro_rules! assert_async_commutative {
455        ($a:expr, $b: expr, $($tail: expr),*) => {
456            {
457                let v = vec![$a, $b, $($tail),*];
458                $crate::assert_async_commutative!(&v)
459            }
460        };
461        ($v:expr) => {
462            {
463                let (a, b, c) = $crate::test_semigroup::pick3($v);
464                $crate::test_async_semigroup::assert_async_commutative_impl(a.clone(), b.clone(), c.clone()).await;
465            }
466        };
467    }
468
469    pub async fn assert_async_commutative_impl<T: AsyncCommutative + Clone + PartialEq + Debug>(
470        a: T,
471        b: T,
472        c: T,
473    ) {
474        assert_async_commutative_law(a.clone(), b.clone(), c.clone()).await;
475    }
476
477    pub async fn assert_async_associative_law<T: AsyncSemigroup + Clone + PartialEq + Debug>(
478        a: T,
479        b: T,
480        c: T,
481    ) {
482        let ab_c = AsyncSemigroup::async_op(
483            AsyncSemigroup::async_op(a.clone(), b.clone()).await,
484            c.clone(),
485        )
486        .await;
487        let a_bc = AsyncSemigroup::async_op(
488            a.clone(),
489            AsyncSemigroup::async_op(b.clone(), c.clone()).await,
490        )
491        .await;
492        assert_eq!(ab_c, a_bc);
493    }
494
495    pub async fn assert_async_commutative_law<T: AsyncCommutative + Clone + PartialEq + Debug>(
496        a: T,
497        b: T,
498        c: T,
499    ) {
500        let abc = AsyncSemigroup::async_op(
501            AsyncSemigroup::async_op(a.clone(), b.clone()).await,
502            c.clone(),
503        )
504        .await;
505        let bca = AsyncSemigroup::async_op(
506            AsyncSemigroup::async_op(b.clone(), c.clone()).await,
507            a.clone(),
508        )
509        .await;
510        let cba = AsyncSemigroup::async_op(
511            AsyncSemigroup::async_op(c.clone(), b.clone()).await,
512            a.clone(),
513        )
514        .await;
515        let acb = AsyncSemigroup::async_op(
516            AsyncSemigroup::async_op(a.clone(), c.clone()).await,
517            b.clone(),
518        )
519        .await;
520        let bac = AsyncSemigroup::async_op(
521            AsyncSemigroup::async_op(b.clone(), a.clone()).await,
522            c.clone(),
523        )
524        .await;
525        let cab = AsyncSemigroup::async_op(
526            AsyncSemigroup::async_op(c.clone(), a.clone()).await,
527            b.clone(),
528        )
529        .await;
530
531        assert_eq!(abc, bca);
532        assert_eq!(bca, cba);
533        assert_eq!(cba, acb);
534        assert_eq!(acb, bac);
535        assert_eq!(bac, cab);
536        assert_eq!(cab, abc);
537    }
538}