|
|
|
|
@ -97,25 +97,92 @@ impl<J: SequentialJob> Queue<J> {
|
|
|
|
|
mod tests { |
|
|
|
|
use super::*; |
|
|
|
|
|
|
|
|
|
use std::sync::Arc; |
|
|
|
|
use std::thread; |
|
|
|
|
|
|
|
|
|
use std::sync::Arc; |
|
|
|
|
use std::time::Duration; |
|
|
|
|
|
|
|
|
|
use rand::thread_rng; |
|
|
|
|
use rand::Rng; |
|
|
|
|
|
|
|
|
|
struct TestJob {} |
|
|
|
|
#[test] |
|
|
|
|
fn test_consume_queue() { |
|
|
|
|
struct TestJob { |
|
|
|
|
cnt: Arc<AtomicUsize>, |
|
|
|
|
wait_sequential: Duration, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl SequentialJob for TestJob { |
|
|
|
|
fn is_ready(&self) -> bool { |
|
|
|
|
true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn sequential_work(self) { |
|
|
|
|
thread::sleep(self.wait_sequential); |
|
|
|
|
self.cnt.fetch_add(1, Ordering::SeqCst); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn hammer(queue: &Arc<Queue<TestJob>>, cnt: Arc<AtomicUsize>) -> usize { |
|
|
|
|
let mut jobs = 0; |
|
|
|
|
let mut rng = thread_rng(); |
|
|
|
|
for _ in 0..10_000 { |
|
|
|
|
if rng.gen() { |
|
|
|
|
let wait_sequential: u64 = rng.gen(); |
|
|
|
|
let wait_sequential = wait_sequential % 1000; |
|
|
|
|
|
|
|
|
|
let wait_parallel: u64 = rng.gen(); |
|
|
|
|
let wait_parallel = wait_parallel % 1000; |
|
|
|
|
|
|
|
|
|
impl SequentialJob for TestJob { |
|
|
|
|
fn is_ready(&self) -> bool { |
|
|
|
|
true |
|
|
|
|
thread::sleep(Duration::from_micros(wait_parallel)); |
|
|
|
|
|
|
|
|
|
queue.push(TestJob { |
|
|
|
|
cnt: cnt.clone(), |
|
|
|
|
wait_sequential: Duration::from_micros(wait_sequential), |
|
|
|
|
}); |
|
|
|
|
jobs += 1; |
|
|
|
|
} else { |
|
|
|
|
queue.consume(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
queue.consume(); |
|
|
|
|
jobs |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn sequential_work(self) {} |
|
|
|
|
let queue = Arc::new(Queue::new()); |
|
|
|
|
let counter = Arc::new(AtomicUsize::new(0)); |
|
|
|
|
|
|
|
|
|
// repeatedly apply operations randomly from concurrent threads
|
|
|
|
|
let other = { |
|
|
|
|
let queue = queue.clone(); |
|
|
|
|
let counter = counter.clone(); |
|
|
|
|
thread::spawn(move || hammer(&queue, counter)) |
|
|
|
|
}; |
|
|
|
|
let mut jobs = hammer(&queue, counter.clone()); |
|
|
|
|
|
|
|
|
|
// wait, consume and check empty
|
|
|
|
|
jobs += other.join().unwrap(); |
|
|
|
|
assert_eq!(queue.queue.lock().len(), 0, "elements left in queue"); |
|
|
|
|
assert_eq!( |
|
|
|
|
jobs, |
|
|
|
|
counter.load(Ordering::Acquire), |
|
|
|
|
"did not consume every job" |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Fuzz the Queue */ |
|
|
|
|
#[test] |
|
|
|
|
fn test_queue() { |
|
|
|
|
fn test_fuzz_queue() { |
|
|
|
|
struct TestJob {} |
|
|
|
|
|
|
|
|
|
impl SequentialJob for TestJob { |
|
|
|
|
fn is_ready(&self) -> bool { |
|
|
|
|
true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn sequential_work(self) {} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn hammer(queue: &Arc<Queue<TestJob>>) { |
|
|
|
|
let mut rng = thread_rng(); |
|
|
|
|
for _ in 0..1_000_000 { |
|
|
|
|
|