195 lines
4.4 KiB
Rust
195 lines
4.4 KiB
Rust
mod common;
|
|
|
|
use std::future::Future;
|
|
use std::mem::forget;
|
|
use std::pin::Pin;
|
|
use std::sync::{
|
|
atomic::{AtomicUsize, Ordering},
|
|
mpsc, Arc,
|
|
};
|
|
use std::task::Context;
|
|
use std::task::Poll;
|
|
use std::thread;
|
|
|
|
use common::check_yields_when_contended;
|
|
|
|
use async_lock::Semaphore;
|
|
use futures_lite::{future, pin};
|
|
|
|
#[test]
|
|
fn try_acquire() {
|
|
let s = Semaphore::new(2);
|
|
let g1 = s.try_acquire().unwrap();
|
|
let _g2 = s.try_acquire().unwrap();
|
|
|
|
assert!(s.try_acquire().is_none());
|
|
drop(g1);
|
|
assert!(s.try_acquire().is_some());
|
|
}
|
|
|
|
#[test]
|
|
fn stress() {
|
|
const COUNT: usize = if cfg!(miri) { 500 } else { 10_000 };
|
|
|
|
let s = Arc::new(Semaphore::new(5));
|
|
let (tx, rx) = mpsc::channel::<()>();
|
|
|
|
for _ in 0..50 {
|
|
let s = s.clone();
|
|
let tx = tx.clone();
|
|
|
|
thread::spawn(move || {
|
|
future::block_on(async {
|
|
for _ in 0..COUNT {
|
|
s.acquire().await;
|
|
}
|
|
drop(tx);
|
|
})
|
|
});
|
|
}
|
|
|
|
drop(tx);
|
|
let _ = rx.recv();
|
|
|
|
let _g1 = s.try_acquire().unwrap();
|
|
let g2 = s.try_acquire().unwrap();
|
|
let _g3 = s.try_acquire().unwrap();
|
|
let _g4 = s.try_acquire().unwrap();
|
|
let _g5 = s.try_acquire().unwrap();
|
|
|
|
assert!(s.try_acquire().is_none());
|
|
drop(g2);
|
|
assert!(s.try_acquire().is_some());
|
|
}
|
|
|
|
#[test]
|
|
fn as_mutex() {
|
|
let s = Arc::new(Semaphore::new(1));
|
|
let s2 = s.clone();
|
|
let _t = thread::spawn(move || {
|
|
future::block_on(async {
|
|
let _g = s2.acquire().await;
|
|
});
|
|
});
|
|
future::block_on(async {
|
|
let _g = s.acquire().await;
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn multi_resource() {
|
|
let s = Arc::new(Semaphore::new(2));
|
|
let s2 = s.clone();
|
|
let (tx1, rx1) = mpsc::channel();
|
|
let (tx2, rx2) = mpsc::channel();
|
|
let _t = thread::spawn(move || {
|
|
future::block_on(async {
|
|
let _g = s2.acquire().await;
|
|
let _ = rx2.recv();
|
|
tx1.send(()).unwrap();
|
|
});
|
|
});
|
|
future::block_on(async {
|
|
let _g = s.acquire().await;
|
|
tx2.send(()).unwrap();
|
|
rx1.recv().unwrap();
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn lifetime() {
|
|
// Show that the future keeps the semaphore alive.
|
|
let _fut = {
|
|
let mutex = Arc::new(Semaphore::new(2));
|
|
mutex.acquire_arc()
|
|
};
|
|
}
|
|
|
|
#[test]
|
|
fn yields_when_contended() {
|
|
let s = Semaphore::new(1);
|
|
check_yields_when_contended(s.try_acquire().unwrap(), s.acquire());
|
|
|
|
let s = Arc::new(s);
|
|
check_yields_when_contended(s.try_acquire_arc().unwrap(), s.acquire_arc());
|
|
}
|
|
|
|
#[cfg(all(feature = "std", not(target_family = "wasm")))]
|
|
#[test]
|
|
fn smoke_blocking() {
|
|
let s = Semaphore::new(2);
|
|
let g1 = s.acquire_blocking();
|
|
let _g2 = s.acquire_blocking();
|
|
assert!(s.try_acquire().is_none());
|
|
drop(g1);
|
|
assert!(s.try_acquire().is_some());
|
|
}
|
|
|
|
#[cfg(all(feature = "std", not(target_family = "wasm")))]
|
|
#[test]
|
|
fn smoke_arc_blocking() {
|
|
let s = Arc::new(Semaphore::new(2));
|
|
let g1 = s.acquire_arc_blocking();
|
|
let _g2 = s.acquire_arc_blocking();
|
|
assert!(s.try_acquire().is_none());
|
|
drop(g1);
|
|
assert!(s.try_acquire().is_some());
|
|
}
|
|
|
|
#[test]
|
|
fn add_permits() {
|
|
static COUNTER: AtomicUsize = AtomicUsize::new(0);
|
|
|
|
let s = Arc::new(Semaphore::new(0));
|
|
let (tx, rx) = mpsc::channel::<()>();
|
|
|
|
for _ in 0..50 {
|
|
let s = s.clone();
|
|
let tx = tx.clone();
|
|
|
|
thread::spawn(move || {
|
|
future::block_on(async {
|
|
let perm = s.acquire().await;
|
|
forget(perm);
|
|
COUNTER.fetch_add(1, Ordering::Relaxed);
|
|
drop(tx);
|
|
})
|
|
});
|
|
}
|
|
|
|
assert_eq!(COUNTER.load(Ordering::Relaxed), 0);
|
|
|
|
s.add_permits(50);
|
|
|
|
drop(tx);
|
|
let _ = rx.recv();
|
|
|
|
assert_eq!(COUNTER.load(Ordering::Relaxed), 50);
|
|
}
|
|
|
|
#[test]
|
|
fn add_permits_2() {
|
|
future::block_on(AddPermitsTest);
|
|
}
|
|
|
|
struct AddPermitsTest;
|
|
|
|
impl Future for AddPermitsTest {
|
|
type Output = ();
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
|
|
let s = Semaphore::new(0);
|
|
let acq = s.acquire();
|
|
pin!(acq);
|
|
let acq_2 = s.acquire();
|
|
pin!(acq_2);
|
|
assert!(acq.as_mut().poll(cx).is_pending());
|
|
assert!(acq_2.as_mut().poll(cx).is_pending());
|
|
s.add_permits(1);
|
|
let g = acq.poll(cx);
|
|
assert!(g.is_ready());
|
|
assert!(acq_2.poll(cx).is_pending());
|
|
|
|
Poll::Ready(())
|
|
}
|
|
}
|