Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ec204fa
Replace async-std with tokio in PVF subsystem
mrcnski Dec 11, 2022
3abc804
Rework workers to use `select!` instead of a mutex
mrcnski Dec 13, 2022
5fcc67d
Remove unnecessary `fuse`
mrcnski Dec 13, 2022
1574561
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Dec 13, 2022
451fae0
Add explanation for `expect()`
mrcnski Dec 13, 2022
fc4c28b
Update node/core/pvf/src/worker_common.rs
mrcnski Dec 18, 2022
1dde78b
Update node/core/pvf/src/worker_common.rs
mrcnski Dec 18, 2022
da31a48
Address some review comments
mrcnski Dec 18, 2022
35a0c79
Merge remote-tracking branch 'origin/m-cat/replace-async-std-pvf' int…
mrcnski Dec 18, 2022
e1c2cf3
Shutdown tokio runtime
mrcnski Dec 18, 2022
077a123
Run cargo fmt
mrcnski Dec 19, 2022
e0d4b9e
Add a small note about retries
mrcnski Dec 19, 2022
2353747
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Dec 20, 2022
28d4062
Fix up merge
mrcnski Dec 20, 2022
3964aca
Rework `cpu_time_monitor_loop` to return when other thread finishes
mrcnski Dec 20, 2022
7057518
Add error string to PrepareError::IoErr variant
mrcnski Dec 20, 2022
e6ba098
Log when artifacts fail to prepare
mrcnski Dec 20, 2022
e094f80
Fix `cpu_time_monitor_loop`; fix test
mrcnski Dec 20, 2022
c09377a
Fix text
mrcnski Dec 20, 2022
05d1865
Fix a couple of potential minor data races.
mrcnski Dec 22, 2022
b0c2434
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Jan 5, 2023
5cc477c
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Jan 9, 2023
0f4ac06
Update Cargo.lock
mrcnski Jan 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix a couple of potential minor data races.
First data race was due to logging in the CPU monitor thread even if the
job (other thread) finished. It can technically finish before or after the log.

Maybe best would be to move this log to the `select!`s, where we are guaranteed
to have chosen the timed-out branch, although there would be a bit of
duplication.

Also, it was possible for this thread to complete before we executed
`finished_tx.send` in the other thread, which would trigger an error as the
receiver has already been dropped. And right now, such a spurious error from
`send` would be returned even if the job otherwise succeeded.
  • Loading branch information
mrcnski committed Dec 22, 2022
commit 05d1865adee9d0f6475f267e29d0ca45c305a6a8
24 changes: 15 additions & 9 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
executor_intf::Executor,
worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, worker_event_loop, IdleWorker, JobKind, SpawnErr, WorkerHandle,
spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
Expand Down Expand Up @@ -248,12 +248,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Spawn a new thread that runs the CPU time monitor.
let thread_fut = rt_handle
.spawn_blocking(move || {
cpu_time_monitor_loop(
JobKind::Execute,
cpu_time_start,
execution_timeout,
finished_rx,
);
cpu_time_monitor_loop(cpu_time_start, execution_timeout, finished_rx)
})
.fuse();
let executor_2 = executor.clone();
Expand All @@ -271,12 +266,23 @@ pub fn worker_entrypoint(socket_path: &str) {
// finish in the background.
join_res = thread_fut => {
match join_res {
Ok(()) => Response::TimedOut,
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"execute job took {}ms cpu time, exceeded execute timeout {}ms",
cpu_time_elapsed.as_millis(),
execution_timeout.as_millis(),
);
Response::TimedOut
},
Ok(None) => Response::InternalError("error communicating over finished channel".into()),
Err(e) => Response::InternalError(format!("{}", e)),
}
},
execute_res = execute_fut => {
finished_tx.send(()).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
let _ = finished_tx.send(());
execute_res.unwrap_or_else(|e| Response::InternalError(format!("{}", e)))
},
};
Expand Down
26 changes: 16 additions & 10 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::{
error::{PrepareError, PrepareResult},
worker_common::{
bytes_to_path, cpu_time_monitor_loop, framed_recv, framed_send, path_to_bytes,
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, JobKind, SpawnErr,
WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle,
JOB_TIMEOUT_WALL_CLOCK_FACTOR,
},
LOG_TARGET,
};
Expand Down Expand Up @@ -316,12 +316,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Spawn a new thread that runs the CPU time monitor.
let thread_fut = rt_handle
.spawn_blocking(move || {
cpu_time_monitor_loop(
JobKind::Prepare,
cpu_time_start,
preparation_timeout,
finished_rx,
)
cpu_time_monitor_loop(cpu_time_start, preparation_timeout, finished_rx)
})
.fuse();
let prepare_fut = rt_handle.spawn_blocking(move || prepare_artifact(&code)).fuse();
Expand All @@ -334,13 +329,24 @@ pub fn worker_entrypoint(socket_path: &str) {
// finish in the background.
join_res = thread_fut => {
match join_res {
Ok(()) => Err(PrepareError::TimedOut),
Ok(Some(cpu_time_elapsed)) => {
// Log if we exceed the timeout and the other thread hasn't finished.
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_time_elapsed.as_millis(),
preparation_timeout.as_millis(),
);
Err(PrepareError::TimedOut)
},
Ok(None) => Err(PrepareError::IoErr("error communicating over finished channel".into())),
Err(err) => Err(PrepareError::IoErr(err.to_string())),
}
},
compilation_res = prepare_fut => {
let cpu_time_elapsed = cpu_time_start.elapsed();
finished_tx.send(()).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
let _ = finished_tx.send(());

match compilation_res.unwrap_or_else(|err| Err(PrepareError::IoErr(err.to_string()))) {
Err(err) => {
Expand Down
45 changes: 10 additions & 35 deletions node/core/pvf/src/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,6 @@ pub const JOB_TIMEOUT_WALL_CLOCK_FACTOR: u32 = 4;
/// child process.
pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50);

#[derive(Copy, Clone, Debug)]
pub enum JobKind {
Prepare,
Execute,
}

impl fmt::Display for JobKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Prepare => write!(f, "prepare"),
Self::Execute => write!(f, "execute"),
}
}
}

/// This is publicly exposed only for integration tests.
#[doc(hidden)]
pub async fn spawn_with_program_path(
Expand Down Expand Up @@ -206,20 +191,19 @@ where
}

/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
/// from sleeping and then either sleeps for the remaining CPU time, or returns if we exceed the CPU
/// timeout.
/// and then either blocks for the remaining CPU time, or returns if we exceed the CPU timeout.
///
/// Returning indicates that we should send a `TimedOut` error to the host.
/// Returning `Some` indicates that we should send a `TimedOut` error to the host. Will return
/// `None` if the other thread finishes first, without us timing out.
///
/// NOTE: Returning will cause the worker, whether preparation or execution, to be killed by the
/// host. We do not kill the process here because it would interfere with the proper handling of
/// this error.
/// NOTE: Sending a `TimedOut` error to the host will cause the worker, whether preparation or
/// execution, to be killed by the host. We do not kill the process here because it would interfere
/// with the proper handling of this error.
pub fn cpu_time_monitor_loop(
job_kind: JobKind,
cpu_time_start: ProcessTime,
timeout: Duration,
finished_rx: Receiver<()>,
) {
) -> Option<Duration> {
loop {
let cpu_time_elapsed = cpu_time_start.elapsed();

Expand All @@ -230,23 +214,14 @@ pub fn cpu_time_monitor_loop(
let sleep_interval = timeout.saturating_sub(cpu_time_elapsed) + JOB_TIMEOUT_OVERHEAD;
match finished_rx.recv_timeout(sleep_interval) {
// Received finish signal.
Ok(()) => return,
Ok(()) => return None,
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return,
Err(RecvTimeoutError::Disconnected) => return None,
}
}

// Log if we exceed the timeout.
gum::warn!(
target: LOG_TARGET,
worker_pid = %std::process::id(),
"{job_kind} job took {}ms cpu time, exceeded {job_kind} timeout {}ms",
cpu_time_elapsed.as_millis(),
timeout.as_millis(),
);

return
return Some(cpu_time_elapsed)
}
}

Expand Down