Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Use wait_while for Condvar in Queue to simplify code.
  • Loading branch information
ehuss committed Mar 10, 2020
commit 05a1f43a7ffa47464ec84a8b37532c7cd2b23157
51 changes: 20 additions & 31 deletions src/cargo/util/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::VecDeque;
use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};
use std::time::Duration;

/// A simple, threadsafe, queue of items of type `T`
///
Expand Down Expand Up @@ -40,41 +40,30 @@ impl<T> Queue<T> {

/// Pushes an item onto the queue, blocking if the queue is full.
pub fn push_bounded(&self, item: T) {
let mut state = self.state.lock().unwrap();
loop {
if state.items.len() >= self.bound {
state = self.bounded_cv.wait(state).unwrap();
} else {
state.items.push_back(item);
self.popper_cv.notify_one();
break;
}
}
let locked_state = self.state.lock().unwrap();
let mut state = self
.bounded_cv
.wait_while(locked_state, |s| s.items.len() >= self.bound)
.unwrap();
state.items.push_back(item);
self.popper_cv.notify_one();
}

pub fn pop(&self, timeout: Duration) -> Option<T> {
let mut state = self.state.lock().unwrap();
let now = Instant::now();
while state.items.is_empty() {
let elapsed = now.elapsed();
if elapsed >= timeout {
break;
}
let (lock, result) = self
.popper_cv
.wait_timeout(state, timeout - elapsed)
.unwrap();
state = lock;
if result.timed_out() {
break;
let (mut state, result) = self
.popper_cv
.wait_timeout_while(self.state.lock().unwrap(), timeout, |s| s.items.is_empty())
.unwrap();
if result.timed_out() {
None
} else {
let value = state.items.pop_front()?;
if state.items.len() < self.bound {
// Assumes threads cannot be canceled.
self.bounded_cv.notify_one();
}
Some(value)
}
let value = state.items.pop_front()?;
if state.items.len() < self.bound {
// Assumes threads cannot be canceled.
self.bounded_cv.notify_one();
}
Some(value)
}

pub fn try_pop_all(&self) -> Vec<T> {
Expand Down