use std::{sync::mpsc, thread::sleep, time::Duration}; use async_broadcast::*; use futures_util::{future::join, stream::StreamExt}; use easy_parallel::Parallel; use futures_lite::future::block_on; fn ms(ms: u64) -> Duration { Duration::from_millis(ms) } #[test] fn basic_sync() { let (s, mut r1) = broadcast(10); let mut r2 = r1.clone(); s.try_broadcast(7).unwrap(); assert_eq!(r1.try_recv().unwrap(), 7); assert_eq!(r2.try_recv().unwrap(), 7); let mut r3 = r1.clone(); s.try_broadcast(8).unwrap(); assert_eq!(r1.try_recv().unwrap(), 8); assert_eq!(r2.try_recv().unwrap(), 8); assert_eq!(r3.try_recv().unwrap(), 8); } #[test] fn basic_async() { block_on(async { let (s, mut r1) = broadcast(10); let mut r2 = r1.clone(); s.broadcast(7).await.unwrap(); assert_eq!(r1.recv().await.unwrap(), 7); assert_eq!(r2.recv().await.unwrap(), 7); // Now let's try the Stream impl. let mut r3 = r1.clone(); s.broadcast(8).await.unwrap(); assert_eq!(r1.next().await.unwrap(), 8); assert_eq!(r2.next().await.unwrap(), 8); assert_eq!(r3.next().await.unwrap(), 8); }); } #[cfg(not(target_family = "wasm"))] #[test] fn basic_blocking() { let (s, mut r) = broadcast(1); s.broadcast_blocking(7).unwrap(); assert_eq!(r.try_recv(), Ok(7)); s.broadcast_blocking(8).unwrap(); assert_eq!(block_on(r.recv()), Ok(8)); block_on(s.broadcast(9)).unwrap(); assert_eq!(r.recv_blocking(), Ok(9)); assert_eq!(r.try_recv(), Err(TryRecvError::Empty)); } #[test] fn parallel() { let (s1, mut r1) = broadcast(2); let s2 = s1.clone(); let mut r2 = r1.clone(); let (sender_sync_send, sender_sync_recv) = mpsc::channel(); let (receiver_sync_send, receiver_sync_recv) = mpsc::channel(); Parallel::new() .add(move || { sender_sync_recv.recv().unwrap(); s1.try_broadcast(7).unwrap(); s2.try_broadcast(8).unwrap(); assert!(s2.try_broadcast(9).unwrap_err().is_full()); assert!(s1.try_broadcast(10).unwrap_err().is_full()); receiver_sync_send.send(()).unwrap(); drop(s1); drop(s2); receiver_sync_send.send(()).unwrap(); }) .add(move || { assert_eq!(r1.try_recv(), Err(TryRecvError::Empty)); assert_eq!(r2.try_recv(), Err(TryRecvError::Empty)); sender_sync_send.send(()).unwrap(); receiver_sync_recv.recv().unwrap(); assert_eq!(r1.try_recv().unwrap(), 7); assert_eq!(r1.try_recv().unwrap(), 8); assert_eq!(r2.try_recv().unwrap(), 7); assert_eq!(r2.try_recv().unwrap(), 8); receiver_sync_recv.recv().unwrap(); assert_eq!(r1.try_recv(), Err(TryRecvError::Closed)); assert_eq!(r2.try_recv(), Err(TryRecvError::Closed)); }) .run(); } #[test] fn parallel_async() { let (s1, mut r1) = broadcast(2); let s2 = s1.clone(); let mut r2 = r1.clone(); let (sender_sync_send, sender_sync_recv) = mpsc::channel(); let (receiver_sync_send, receiver_sync_recv) = mpsc::channel(); Parallel::new() .add(move || { block_on(async move { sender_sync_recv.recv().unwrap(); sleep(ms(5)); s1.broadcast(7).await.unwrap(); s2.broadcast(8).await.unwrap(); assert!(s2.try_broadcast(9).unwrap_err().is_full()); assert!(s1.try_broadcast(10).unwrap_err().is_full()); receiver_sync_send.send(()).unwrap(); s1.broadcast(9).await.unwrap(); s2.broadcast(10).await.unwrap(); drop(s1); drop(s2); receiver_sync_send.send(()).unwrap(); }) }) .add(move || { block_on(async move { assert_eq!(r1.try_recv(), Err(TryRecvError::Empty)); assert_eq!(r2.try_recv(), Err(TryRecvError::Empty)); sender_sync_send.send(()).unwrap(); receiver_sync_recv.recv().unwrap(); assert_eq!(r1.next().await.unwrap(), 7); assert_eq!(r2.next().await.unwrap(), 7); assert_eq!(r1.recv().await.unwrap(), 8); assert_eq!(r2.recv().await.unwrap(), 8); receiver_sync_recv.recv().unwrap(); sleep(ms(5)); assert_eq!(r1.next().await.unwrap(), 9); assert_eq!(r2.next().await.unwrap(), 9); assert_eq!(r1.recv().await.unwrap(), 10); assert_eq!(r2.recv().await.unwrap(), 10); assert_eq!(r1.recv().await, Err(RecvError::Closed)); assert_eq!(r2.recv().await, Err(RecvError::Closed)); }) }) .run(); } #[test] fn channel_shrink() { let (s1, mut r1) = broadcast(4); let mut r2 = r1.clone(); let mut r3 = r1.clone(); let mut r4 = r1.clone(); s1.try_broadcast(1).unwrap(); s1.try_broadcast(2).unwrap(); s1.try_broadcast(3).unwrap(); s1.try_broadcast(4).unwrap(); assert_eq!(r2.try_recv().unwrap(), 1); assert_eq!(r2.try_recv().unwrap(), 2); assert_eq!(r3.try_recv().unwrap(), 1); assert_eq!(r3.try_recv().unwrap(), 2); assert_eq!(r3.try_recv().unwrap(), 3); assert_eq!(r4.try_recv().unwrap(), 1); assert_eq!(r4.try_recv().unwrap(), 2); assert_eq!(r4.try_recv().unwrap(), 3); assert_eq!(r4.try_recv().unwrap(), 4); r1.set_capacity(2); assert_eq!(r1.try_recv(), Err(TryRecvError::Overflowed(2))); assert_eq!(r1.try_recv().unwrap(), 3); assert_eq!(r1.try_recv().unwrap(), 4); assert_eq!(r1.try_recv(), Err(TryRecvError::Empty)); assert_eq!(r2.try_recv().unwrap(), 3); assert_eq!(r2.try_recv().unwrap(), 4); assert_eq!(r2.try_recv(), Err(TryRecvError::Empty)); assert_eq!(r3.try_recv().unwrap(), 4); assert_eq!(r3.try_recv(), Err(TryRecvError::Empty)); assert_eq!(r4.try_recv(), Err(TryRecvError::Empty)); } #[test] fn overflow() { let (s1, mut r1) = broadcast(2); r1.set_overflow(true); // We'll keep r1 as the lagging receiver. let mut r2 = r1.clone(); let mut r3 = r1.clone(); let (sender_sync_send, sender_sync_recv) = mpsc::channel(); Parallel::new() .add(move || { block_on(async move { s1.broadcast(7).await.unwrap(); s1.broadcast(8).await.unwrap(); sender_sync_recv.recv().unwrap(); sleep(ms(5)); s1.broadcast(9).await.unwrap(); sender_sync_recv.recv().unwrap(); }) }) .add(move || { block_on(async move { assert_eq!(r2.next().await.unwrap(), 7); assert_eq!(r2.recv().await.unwrap(), 8); sender_sync_send.send(()).unwrap(); assert_eq!(r2.next().await.unwrap(), 9); sender_sync_send.send(()).unwrap(); }) }) .add(move || { block_on(async move { assert_eq!(r3.next().await.unwrap(), 7); assert_eq!(r3.recv().await.unwrap(), 8); assert_eq!(r3.next().await.unwrap(), 9); }) }) .run(); assert_eq!(r1.try_recv(), Err(TryRecvError::Overflowed(1))); assert_eq!(r1.try_recv().unwrap(), 8); assert_eq!(r1.try_recv().unwrap(), 9); } #[test] fn open_channel() { let (s1, r) = broadcast(2); let inactive = r.deactivate(); let s2 = s1.clone(); let (receiver_sync_send, receiver_sync_recv) = mpsc::channel(); let (sender_sync_send, sender_sync_recv) = mpsc::channel(); Parallel::new() .add(move || { block_on(async move { receiver_sync_send.send(()).unwrap(); let (result1, result2) = join(s1.broadcast(7), s2.broadcast(8)).await; result1.unwrap(); result2.unwrap(); sender_sync_recv.recv().unwrap(); assert_eq!(s1.try_broadcast(9), Err(TrySendError::Inactive(9))); assert_eq!(s2.try_broadcast(10), Err(TrySendError::Inactive(10))); receiver_sync_send.send(()).unwrap(); sleep(ms(5)); s1.broadcast(9).await.unwrap(); s2.broadcast(10).await.unwrap(); }) }) .add(move || { block_on(async move { receiver_sync_recv.recv().unwrap(); sleep(ms(5)); let mut r = inactive.activate_cloned(); assert_eq!(r.next().await.unwrap(), 7); assert_eq!(r.recv().await.unwrap(), 8); drop(r); sender_sync_send.send(()).unwrap(); receiver_sync_recv.recv().unwrap(); let mut r = inactive.activate(); assert_eq!(r.recv().await.unwrap(), 9); assert_eq!(r.recv().await.unwrap(), 10); }) }) .run(); } #[test] fn inactive_drop() { let (s, active_receiver) = broadcast::<()>(1); let inactive = active_receiver.deactivate(); let inactive2 = inactive.clone(); drop(inactive); drop(inactive2); assert!(s.is_closed()) } #[test] fn poll_recv() { let (s, mut r) = broadcast::(2); r.set_overflow(true); // A quick custom stream impl to demonstrate/test `poll_recv`. struct MyStream(Receiver); impl futures_core::Stream for MyStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { std::pin::Pin::new(&mut self.0).poll_recv(cx) } } block_on(async move { let mut stream = MyStream(r); s.broadcast(1).await.unwrap(); s.broadcast(2).await.unwrap(); s.broadcast(3).await.unwrap(); s.broadcast(4).await.unwrap(); assert_eq!(stream.next().await.unwrap(), Err(RecvError::Overflowed(2))); assert_eq!(stream.next().await.unwrap(), Ok(3)); assert_eq!(stream.next().await.unwrap(), Ok(4)); drop(s); assert_eq!(stream.next().await, None); }) }