semigroup/concurrent.rs
1use std::future::Future;
2
3use futures::{Stream, StreamExt, TryStream, TryStreamExt};
4
5use crate::{Commutative, Semigroup};
6
7/// EXPERIMENTAL: Async version of [`Semigroup`].
8pub trait AsyncSemigroup: Sized {
9 fn async_op_assign(base: &mut Self, other: Self) -> impl Future<Output = ()>;
10 fn async_op(mut base: Self, other: Self) -> impl Future<Output = Self> {
11 async {
12 Self::async_op_assign(&mut base, other).await;
13 base
14 }
15 }
16 fn async_semigroup_assign(&mut self, other: Self) -> impl Future<Output = ()> {
17 Self::async_op_assign(self, other)
18 }
19 fn async_semigroup(self, other: Self) -> impl Future<Output = Self> {
20 Self::async_op(self, other)
21 }
22}
23impl<T: Semigroup> AsyncSemigroup for T {
24 async fn async_op_assign(base: &mut Self, other: Self) {
25 Semigroup::op_assign(base, other)
26 }
27}
28
29/// Async version of [`Commutative`].
30pub trait AsyncCommutative: AsyncSemigroup + Commutative {
31 /// Used by [`CombineStream::fold_semigroup`].
32 fn fold_stream(init: Self, stream: impl Stream<Item = Self>) -> impl Future<Output = Self> {
33 async { stream.fold(init, Self::async_op).await }
34 }
35 /// Used by [`CombineStream::reduce_semigroup`].
36 fn reduce_stream(
37 mut stream: impl Stream<Item = Self> + Unpin,
38 ) -> impl Future<Output = Option<Self>> {
39 async {
40 let init = stream.next().await?;
41 Some(Self::fold_stream(init, stream).await)
42 }
43 }
44 /// Used by [`CombineStream::combine_monoid`].
45 #[cfg(feature = "monoid")]
46 fn combine_stream(stream: impl Stream<Item = Self>) -> impl Future<Output = Self>
47 where
48 Self: crate::Monoid,
49 {
50 async { Self::fold_stream(Self::identity(), stream).await }
51 }
52}
53impl<T: Commutative> AsyncCommutative for T {}
54
55/// Extensions for [`Stream`]s that items implement [`AsyncCommutative`]. Like [`crate::CombineIterator`].
56pub trait CombineStream: Sized + Stream {
57 /// This method like [`crate::CombineIterator::fold_final`], but stream.
58 ///
59 /// # Examples
60 /// ```
61 /// # futures::executor::block_on(async {
62 /// use futures::StreamExt;
63 /// use semigroup::{op::Sum, CombineStream, Semigroup};
64 /// let s1 = futures::stream::iter(0..10);
65 /// let sum = s1.map(Sum).fold_semigroup(Sum(0));
66 /// assert_eq!(sum.await, Sum(45));
67 ///
68 /// let s2 = futures::stream::iter(0..0);
69 /// let empty = s2.map(Sum).fold_semigroup(Sum(0));
70 /// assert_eq!(empty.await, Sum(0))
71 /// # });
72 /// ```
73 ///
74 /// # Type safety
75 /// This method is only available when item implements [`Commutative`].
76 /// ```compile_fail
77 /// # futures::executor::block_on(async {
78 /// use futures::StreamExt;
79 /// use semigroup::{op::Coalesce, CombineStream, Semigroup};
80 /// let stream = futures::stream::iter(0..10);
81 /// let cannot_coalesce = stream.map(Some).map(Coalesce).fold_semigroup(Coalesce(None));
82 /// # });
83 /// ```
84 fn fold_semigroup(self, init: Self::Item) -> impl Future<Output = Self::Item>
85 where
86 Self::Item: AsyncCommutative,
87 {
88 Self::Item::fold_stream(init, self)
89 }
90
91 /// This method like [`crate::CombineIterator::lreduce`], but stream.
92 ///
93 /// # Example
94 /// ```
95 /// # futures::executor::block_on(async {
96 /// use futures::StreamExt;
97 /// use semigroup::{op::Sum, CombineStream, Semigroup};
98 /// let s1 = futures::stream::iter(0..10);
99 /// let sum = s1.map(Sum).reduce_semigroup();
100 /// assert_eq!(sum.await, Some(Sum(45)));
101 ///
102 /// let s2 = futures::stream::iter(0..0);
103 /// let empty = s2.map(Sum).reduce_semigroup();
104 /// assert_eq!(empty.await, None)
105 /// # });
106 /// ```
107 ///
108 /// # Type safety
109 /// This method is only available when item implements [`Commutative`].
110 /// ```compile_fail
111 /// # futures::executor::block_on(async {
112 /// use futures::StreamExt;
113 /// use semigroup::{op::Coalesce, CombineStream, Semigroup};
114 /// let stream = futures::stream::iter(0..10);
115 /// let cannot_coalesce = stream.map(Some).map(Coalesce).reduce_semigroup();
116 /// # });
117 /// ```
118 fn reduce_semigroup(self) -> impl Future<Output = Option<Self::Item>>
119 where
120 Self: Unpin,
121 Self::Item: AsyncCommutative,
122 {
123 Self::Item::reduce_stream(self)
124 }
125
126 /// This method like [`crate::CombineIterator::combine`], but stream.
127 ///
128 /// # Example
129 /// ```
130 /// # futures::executor::block_on(async {
131 /// use futures::StreamExt;
132 /// use semigroup::{op::Sum, CombineStream, Semigroup};
133 /// let s1 = futures::stream::iter(0..10);
134 /// let sum = s1.map(Sum).combine_monoid();
135 /// assert_eq!(sum.await, Sum(45));
136 ///
137 /// let s2 = futures::stream::iter(0..0);
138 /// let empty = s2.map(Sum).combine_monoid();
139 /// assert_eq!(empty.await, Sum(0))
140 /// # });
141 /// ```
142 ///
143 /// # Type safety
144 /// This method is only available when item implements [`Commutative`].
145 /// ```compile_fail
146 /// # futures::executor::block_on(async {
147 /// use futures::StreamExt;
148 /// use semigroup::{op::Coalesce, CombineStream, Semigroup};
149 /// let stream = futures::stream::iter(0..10);
150 /// let cannot_coalesce = stream.map(Some).map(Coalesce).combine_monoid();
151 /// # });
152 /// ```
153 #[cfg(feature = "monoid")]
154 fn combine_monoid(self) -> impl Future<Output = Self::Item>
155 where
156 Self::Item: AsyncCommutative + crate::Monoid,
157 {
158 Self::Item::combine_stream(self)
159 }
160}
161impl<T: Stream> CombineStream for T {}
162
163/// Async try version of [`Commutative`].
164pub trait TryAsyncCommutative: AsyncCommutative {
165 /// Used by [`TryCombineStream::try_fold_semigroup`].
166 fn try_fold_stream<S: TryStream<Ok = Self>>(
167 init: Self,
168 stream: S,
169 ) -> impl Future<Output = Result<Self, S::Error>> {
170 let ok_async_op = |b, o| async { Ok(Self::async_op(b, o).await) };
171 async move { stream.try_fold(init, ok_async_op).await }
172 }
173 /// Used by [`TryCombineStream::try_reduce_semigroup`].
174 fn try_reduce_stream<S: TryStream<Item = Result<Self, E>, Ok = Self, Error = E> + Unpin, E>(
175 mut stream: S,
176 ) -> impl Future<Output = Option<Result<Self, E>>> {
177 async {
178 let init = stream.next().await?;
179 match init {
180 Ok(init) => Some(Self::try_fold_stream(init, stream).await),
181 Err(err) => Some(Err(err)),
182 }
183 }
184 }
185 /// Used by [`TryCombineStream::try_combine_monoid`].
186 #[cfg(feature = "monoid")]
187 fn try_combine_stream<S: TryStream<Ok = Self>>(
188 stream: S,
189 ) -> impl Future<Output = Result<Self, S::Error>>
190 where
191 Self: crate::Monoid,
192 {
193 async { Self::try_fold_stream(Self::identity(), stream).await }
194 }
195}
196impl<T: Commutative> TryAsyncCommutative for T {}
197
198/// Extensions for [`TryStream`]s that items implement [`TryAsyncCommutative`]. Like [`crate::CombineIterator`].
199pub trait TryCombineStream: Sized + TryStream {
200 /// This method like [`crate::CombineIterator::fold_final`], but stream.
201 ///
202 /// # Examples
203 /// ```
204 /// # futures::executor::block_on(async {
205 /// use std::convert::Infallible;
206 /// use futures::StreamExt;
207 /// use semigroup::{op::Sum, TryCombineStream, Semigroup};
208 /// let s1 = futures::stream::iter((0..10).map(Sum).map(Ok::<_, Infallible>));
209 /// let sum = s1.try_fold_semigroup(Sum(0));
210 /// assert_eq!(sum.await, Ok(Sum(45)));
211 ///
212 /// let s2 = futures::stream::iter(vec![Ok(Sum(1)), Err(2), Ok(Sum(3))]);
213 /// let empty = s2.try_fold_semigroup(Sum(0));
214 /// assert_eq!(empty.await, Err(2));
215 ///
216 /// let s3 = futures::stream::iter((0..0).map(Sum).map(Ok::<_, Infallible>));
217 /// let empty = s3.try_fold_semigroup(Sum(0));
218 /// assert_eq!(empty.await, Ok(Sum(0)))
219 /// # });
220 /// ```
221 ///
222 /// # Type safety
223 /// This method is only available when item implements [`Commutative`].
224 /// Same to [`CombineStream::fold_semigroup`].
225 fn try_fold_semigroup(
226 self,
227 init: Self::Ok,
228 ) -> impl Future<Output = Result<Self::Ok, Self::Error>>
229 where
230 Self::Ok: TryAsyncCommutative,
231 {
232 Self::Ok::try_fold_stream(init, self)
233 }
234
235 /// This method like [`crate::CombineIterator::lreduce`], but stream.
236 ///
237 /// # Example
238 /// ```
239 /// # futures::executor::block_on(async {
240 /// use std::convert::Infallible;
241 /// use futures::StreamExt;
242 /// use semigroup::{op::Sum, TryCombineStream, Semigroup};
243 /// let s1 = futures::stream::iter(0..10).map(Sum).map(Ok::<_, Infallible>);
244 /// let sum = s1.try_reduce_semigroup();
245 /// assert_eq!(sum.await, Some(Ok(Sum(45))));
246 ///
247 /// let s2 = futures::stream::iter(vec![Ok(Sum(1)), Err(2), Ok(Sum(3))]);
248 /// let empty = s2.try_reduce_semigroup();
249 /// assert_eq!(empty.await, Some(Err(2)));
250 ///
251 /// let s3 = futures::stream::iter((0..0).map(Sum).map(Ok::<_, Infallible>));
252 /// let empty = s3.try_reduce_semigroup();
253 /// assert_eq!(empty.await, None);
254 /// # });
255 /// ```
256 ///
257 /// # Type safety
258 /// This method is only available when item implements [`Commutative`].
259 /// Same to [`CombineStream::reduce_semigroup`].
260 fn try_reduce_semigroup<T, E>(
261 self,
262 ) -> impl Future<Output = Option<Result<Self::Ok, Self::Error>>>
263 where
264 T: TryAsyncCommutative,
265 Self: Unpin,
266 Self: TryStream<Item = Result<T, E>, Ok = T, Error = E>,
267 {
268 Self::Ok::try_reduce_stream(self)
269 }
270 /// This method like [`crate::CombineIterator::combine`], but stream.
271 ///
272 /// # Example
273 /// ```
274 /// # futures::executor::block_on(async {
275 /// use std::convert::Infallible;
276 /// use futures::StreamExt;
277 /// use semigroup::{op::Sum, TryCombineStream, Semigroup};
278 /// let s1 = futures::stream::iter(0..10).map(Sum).map(Ok::<_, Infallible>);
279 /// let sum = s1.try_combine_monoid();
280 /// assert_eq!(sum.await, Ok(Sum(45)));
281 ///
282 /// let s2 = futures::stream::iter(vec![Ok(Sum(1)), Err(2), Ok(Sum(3))]);
283 /// let empty = s2.try_combine_monoid();
284 /// assert_eq!(empty.await, Err(2));
285 ///
286 /// let s3 = futures::stream::iter((0..0).map(Sum).map(Ok::<_, Infallible>));
287 /// let empty = s3.try_combine_monoid();
288 /// assert_eq!(empty.await, Ok(Sum(0)))
289 /// # });
290 /// ```
291 ///
292 /// # Type safety
293 /// This method is only available when item implements [`Commutative`].
294 /// Same to [`CombineStream::combine_monoid`].
295 #[cfg(feature = "monoid")]
296 fn try_combine_monoid(self) -> impl Future<Output = Result<Self::Ok, Self::Error>>
297 where
298 Self::Ok: TryAsyncCommutative + crate::Monoid,
299 {
300 Self::Ok::try_combine_stream(self)
301 }
302}
303impl<T: TryStream> TryCombineStream for T {}
304
305#[cfg(feature = "test")]
306pub mod test_async_semigroup {
307 use std::fmt::Debug;
308
309 use super::*;
310
311 /// Assert that the given type satisfies the *async semigroup* property.
312 ///
313 /// # Usage
314 /// Same to [`crate::assert_semigroup!`].
315 ///
316 /// # Examples
317 /// ```
318 /// use semigroup::{assert_async_semigroup, op::Sum};
319 ///
320 /// let a = Sum(1);
321 /// let b = Sum(2);
322 /// let c = Sum(3);
323 /// futures::executor::block_on(async {
324 /// assert_async_semigroup!(a, b, c);
325 /// });
326 ///
327 /// let v = vec![a, b, c];
328 /// futures::executor::block_on(async {
329 /// assert_async_semigroup!(&v);
330 /// });
331 /// ```
332 ///
333 /// # Panics
334 /// - If the given function does not satisfy the *async semigroup* property.
335 /// ```should_panic
336 /// use semigroup::{assert_async_semigroup, Op, Semigroup};
337 /// #[derive(Debug, Clone, PartialEq, Op)]
338 /// #[op(commutative)]
339 /// pub struct Sub(i32);
340 /// impl Op<i32> for Sub {
341 /// fn lift_op_assign(base: &mut i32, other: i32) {
342 /// *base -= other;
343 /// }
344 /// }
345 /// let a = Sub(1);
346 /// let b = Sub(2);
347 /// let c = Sub(3);
348 /// futures::executor::block_on(async {
349 /// assert_async_semigroup!(a, b, c);
350 /// });
351 /// ```
352 ///
353 /// - The input iterator has less than 3 items.
354 /// ```compile_fail
355 /// use semigroup::{assert_async_semigroup, op::Sum};
356 /// let a = Sum(1);
357 /// let b = Sum(2);
358 /// futures::executor::block_on(async {
359 /// assert_async_semigroup!(a, b);
360 /// });
361 /// ```
362 /// ```should_panic
363 /// use semigroup::{assert_async_semigroup, op::Sum};
364 /// let a = Sum(1);
365 /// let b = Sum(2);
366 /// futures::executor::block_on(async {
367 /// assert_async_semigroup!(&vec![a, b]);
368 /// });
369 /// ```
370 #[macro_export]
371 macro_rules! assert_async_semigroup {
372 ($a:expr, $b: expr, $($tail: expr),*) => {
373 {
374 let v = vec![$a, $b, $($tail),*];
375 $crate::assert_async_semigroup!(&v)
376 }
377 };
378 ($v:expr) => {
379 {
380 let (a, b, c) = $crate::test_semigroup::pick3($v);
381 $crate::test_async_semigroup::assert_async_semigroup_impl(a.clone(), b.clone(), c.clone()).await;
382 }
383 };
384 }
385
386 pub async fn assert_async_semigroup_impl<T: AsyncCommutative + Clone + PartialEq + Debug>(
387 a: T,
388 b: T,
389 c: T,
390 ) {
391 assert_async_associative_law(a.clone(), b.clone(), c.clone()).await;
392 }
393
394 /// Assert that the given type satisfies the *async commutative* property.
395 ///
396 /// # Usage
397 /// Same to [`crate::assert_semigroup!`].
398 ///
399 /// # Examples
400 /// ```
401 /// use semigroup::{assert_async_commutative, op::Sum};
402 ///
403 /// let a = Sum(1);
404 /// let b = Sum(2);
405 /// let c = Sum(3);
406 /// futures::executor::block_on(async {
407 /// assert_async_commutative!(a, b, c);
408 /// });
409 ///
410 /// let v = vec![a, b, c];
411 /// futures::executor::block_on(async {
412 /// assert_async_commutative!(&v);
413 /// });
414 /// ```
415 ///
416 /// # Panics
417 /// - If the given function does not satisfy the *async commutative* property.
418 /// ```should_panic
419 /// use semigroup::{assert_async_commutative, Op, Semigroup};
420 /// #[derive(Debug, Clone, PartialEq, Op)]
421 /// #[op(commutative)]
422 /// pub struct Sub(i32);
423 /// impl Op<i32> for Sub {
424 /// fn lift_op_assign(base: &mut i32, other: i32) {
425 /// *base -= other;
426 /// }
427 /// }
428 /// let a = Sub(1);
429 /// let b = Sub(2);
430 /// let c = Sub(3);
431 /// futures::executor::block_on(async {
432 /// assert_async_commutative!(a, b, c);
433 /// });
434 /// ```
435 ///
436 /// - The input iterator has less than 3 items.
437 /// ```compile_fail
438 /// use semigroup::{assert_async_commutative, op::Sum};
439 /// let a = Sum(1);
440 /// let b = Sum(2);
441 /// futures::executor::block_on(async {
442 /// assert_async_commutative!(a, b);
443 /// });
444 /// ```
445 /// ```should_panic
446 /// use semigroup::{assert_async_commutative, op::Sum};
447 /// let a = Sum(1);
448 /// let b = Sum(2);
449 /// futures::executor::block_on(async {
450 /// assert_async_commutative!(&vec![a, b]);
451 /// });
452 /// ```
453 #[macro_export]
454 macro_rules! assert_async_commutative {
455 ($a:expr, $b: expr, $($tail: expr),*) => {
456 {
457 let v = vec![$a, $b, $($tail),*];
458 $crate::assert_async_commutative!(&v)
459 }
460 };
461 ($v:expr) => {
462 {
463 let (a, b, c) = $crate::test_semigroup::pick3($v);
464 $crate::test_async_semigroup::assert_async_commutative_impl(a.clone(), b.clone(), c.clone()).await;
465 }
466 };
467 }
468
469 pub async fn assert_async_commutative_impl<T: AsyncCommutative + Clone + PartialEq + Debug>(
470 a: T,
471 b: T,
472 c: T,
473 ) {
474 assert_async_commutative_law(a.clone(), b.clone(), c.clone()).await;
475 }
476
477 pub async fn assert_async_associative_law<T: AsyncSemigroup + Clone + PartialEq + Debug>(
478 a: T,
479 b: T,
480 c: T,
481 ) {
482 let ab_c = AsyncSemigroup::async_op(
483 AsyncSemigroup::async_op(a.clone(), b.clone()).await,
484 c.clone(),
485 )
486 .await;
487 let a_bc = AsyncSemigroup::async_op(
488 a.clone(),
489 AsyncSemigroup::async_op(b.clone(), c.clone()).await,
490 )
491 .await;
492 assert_eq!(ab_c, a_bc);
493 }
494
495 pub async fn assert_async_commutative_law<T: AsyncCommutative + Clone + PartialEq + Debug>(
496 a: T,
497 b: T,
498 c: T,
499 ) {
500 let abc = AsyncSemigroup::async_op(
501 AsyncSemigroup::async_op(a.clone(), b.clone()).await,
502 c.clone(),
503 )
504 .await;
505 let bca = AsyncSemigroup::async_op(
506 AsyncSemigroup::async_op(b.clone(), c.clone()).await,
507 a.clone(),
508 )
509 .await;
510 let cba = AsyncSemigroup::async_op(
511 AsyncSemigroup::async_op(c.clone(), b.clone()).await,
512 a.clone(),
513 )
514 .await;
515 let acb = AsyncSemigroup::async_op(
516 AsyncSemigroup::async_op(a.clone(), c.clone()).await,
517 b.clone(),
518 )
519 .await;
520 let bac = AsyncSemigroup::async_op(
521 AsyncSemigroup::async_op(b.clone(), a.clone()).await,
522 c.clone(),
523 )
524 .await;
525 let cab = AsyncSemigroup::async_op(
526 AsyncSemigroup::async_op(c.clone(), a.clone()).await,
527 b.clone(),
528 )
529 .await;
530
531 assert_eq!(abc, bca);
532 assert_eq!(bca, cba);
533 assert_eq!(cba, acb);
534 assert_eq!(acb, bac);
535 assert_eq!(bac, cab);
536 assert_eq!(cab, abc);
537 }
538}