-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Retry failed PVF prepare jobs #6213
Changes from 1 commit
adc9460
ebd600c
08d103f
086b2fd
84e5764
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,13 @@ pub const PRECHECK_COMPILATION_TIMEOUT: Duration = Duration::from_secs(60); | |
| // NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. | ||
| pub const EXECUTE_COMPILATION_TIMEOUT: Duration = Duration::from_secs(180); | ||
|
|
||
| /// The time period after which a failed preparation artifact is considered ready to be retried. Note that we will only | ||
|
||
| /// retry if another request comes in after this cooldown has passed. | ||
| pub const PREPARE_FAILURE_COOLDOWN: Duration = Duration::from_secs(15 * 60); | ||
|
|
||
| /// The amount of times we will retry failed prepare jobs. | ||
| pub const NUM_PREPARE_RETRIES: u32 = 5; | ||
|
|
||
| /// An alias to not spell the type for the oneshot sender for the PVF execution result. | ||
| pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>; | ||
|
|
||
|
|
@@ -360,6 +367,8 @@ async fn run( | |
| Some(to_host) => to_host, | ||
| }; | ||
|
|
||
| // If the artifact failed before, it could be re-scheduled for preparation here if | ||
| // the preparation failure cooldown has elapsed. | ||
| break_if_fatal!(handle_to_host( | ||
| &cache_path, | ||
| &mut artifacts, | ||
|
|
@@ -376,9 +385,9 @@ async fn run( | |
| // Note that preparation always succeeds. | ||
| // | ||
| // That's because the error conditions are written into the artifact and will be | ||
| // reported at the time of the execution. It potentially, but not necessarily, | ||
| // can be scheduled as a result of this function call, in case there are pending | ||
| // executions. | ||
| // reported at the time of the execution. It potentially, but not necessarily, can | ||
| // be scheduled for execution as a result of this function call, in case there are | ||
| // pending executions. | ||
| // | ||
| // We could be eager in terms of reporting and plumb the result from the preparation | ||
| // worker but we don't for the sake of simplicity. | ||
|
|
@@ -432,6 +441,8 @@ async fn handle_to_host( | |
| /// Handles PVF prechecking. | ||
| /// | ||
| /// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]). | ||
| /// | ||
| /// If the prepare job failed previously, we may retry it under certain conditions. | ||
| async fn handle_precheck_pvf( | ||
| artifacts: &mut Artifacts, | ||
| prepare_queue: &mut mpsc::Sender<prepare::ToQueue>, | ||
|
|
@@ -446,11 +457,28 @@ async fn handle_precheck_pvf( | |
| *last_time_needed = SystemTime::now(); | ||
| let _ = result_sender.send(Ok(())); | ||
| }, | ||
| ArtifactState::Preparing { waiting_for_response } => | ||
| ArtifactState::Preparing { waiting_for_response, num_failures: _ } => | ||
| waiting_for_response.push(result_sender), | ||
| ArtifactState::FailedToProcess(result) => { | ||
| let _ = result_sender.send(PrepareResult::Err(result.clone())); | ||
| }, | ||
| ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => | ||
| if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { | ||
|
||
| // If we are allowed to retry the failed prepare job, change the state to | ||
| // Preparing and re-queue this job. | ||
| *state = ArtifactState::Preparing { | ||
| waiting_for_response: vec![result_sender], | ||
| num_failures: *num_failures, | ||
| }; | ||
| send_prepare( | ||
| prepare_queue, | ||
| prepare::ToQueue::Enqueue { | ||
| priority: Priority::Normal, | ||
| pvf, | ||
| compilation_timeout: PRECHECK_COMPILATION_TIMEOUT, | ||
| }, | ||
| ) | ||
| .await?; | ||
| } else { | ||
| let _ = result_sender.send(PrepareResult::Err(error.clone())); | ||
| }, | ||
| } | ||
| } else { | ||
| artifacts.insert_preparing(artifact_id, vec![result_sender]); | ||
|
|
@@ -469,9 +497,13 @@ async fn handle_precheck_pvf( | |
|
|
||
| /// Handles PVF execution. | ||
| /// | ||
| /// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there is already a | ||
| /// preparation job, we coalesce the two preparation jobs. When preparing for execution, we use a more lenient timeout | ||
| /// ([`EXECUTE_COMPILATION_TIMEOUT`]) than when prechecking. | ||
| /// This will try to prepare the PVF, if a prepared artifact does not already exist. If there is already a | ||
| /// preparation job, we coalesce the two preparation jobs. | ||
| /// | ||
| /// If the prepare job failed previously, we may retry it under certain conditions. | ||
| /// | ||
| /// When preparing for execution, we use a more lenient timeout ([`EXECUTE_COMPILATION_TIMEOUT`]) | ||
| /// than when prechecking. | ||
| async fn handle_execute_pvf( | ||
| cache_path: &Path, | ||
| artifacts: &mut Artifacts, | ||
|
|
@@ -491,6 +523,7 @@ async fn handle_execute_pvf( | |
| ArtifactState::Prepared { last_time_needed } => { | ||
| *last_time_needed = SystemTime::now(); | ||
|
|
||
| // This artifact has already been prepared, send it to the execute queue. | ||
| send_execute( | ||
| execute_queue, | ||
| execute::ToQueue::Enqueue { | ||
|
|
@@ -502,11 +535,29 @@ async fn handle_execute_pvf( | |
| ) | ||
| .await?; | ||
| }, | ||
| ArtifactState::Preparing { waiting_for_response: _ } => { | ||
| ArtifactState::Preparing { .. } => { | ||
| awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); | ||
| }, | ||
| ArtifactState::FailedToProcess(error) => { | ||
| let _ = result_tx.send(Err(ValidationError::from(error.clone()))); | ||
| ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => { | ||
| if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { | ||
| // If we are allowed to retry the failed prepare job, change the state to | ||
| // Preparing and re-queue this job. | ||
| *state = ArtifactState::Preparing { | ||
| waiting_for_response: Vec::new(), | ||
| num_failures: *num_failures, | ||
| }; | ||
| send_prepare( | ||
| prepare_queue, | ||
| prepare::ToQueue::Enqueue { | ||
| priority, | ||
| pvf, | ||
| compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, | ||
| }, | ||
| ) | ||
| .await?; | ||
| } else { | ||
| let _ = result_tx.send(Err(ValidationError::from(error.clone()))); | ||
| } | ||
| }, | ||
| } | ||
| } else { | ||
|
|
@@ -523,6 +574,7 @@ async fn handle_execute_pvf( | |
| ) | ||
| .await?; | ||
|
|
||
| // Add an execution request that will wait to run after this prepare job has finished. | ||
| awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); | ||
| } | ||
|
|
||
|
|
@@ -543,10 +595,29 @@ async fn handle_heads_up( | |
| ArtifactState::Prepared { last_time_needed, .. } => { | ||
| *last_time_needed = now; | ||
| }, | ||
| ArtifactState::Preparing { waiting_for_response: _ } => { | ||
| ArtifactState::Preparing { .. } => { | ||
| // The artifact is already being prepared, so we don't need to do anything. | ||
| }, | ||
| ArtifactState::FailedToProcess(_) => {}, | ||
| ArtifactState::FailedToProcess { last_time_failed, num_failures, error: _ } => { | ||
| // TODO: Do we want to retry for heads-up requests? | ||
|
||
| if can_retry_prepare_after_failure(*last_time_failed, *num_failures) { | ||
| // If we are allowed to retry the failed prepare job, change the state to | ||
| // Preparing and re-queue this job. | ||
| *state = ArtifactState::Preparing { | ||
| waiting_for_response: vec![], | ||
| num_failures: *num_failures, | ||
| }; | ||
| send_prepare( | ||
| prepare_queue, | ||
| prepare::ToQueue::Enqueue { | ||
| priority: Priority::Normal, | ||
| pvf: active_pvf, | ||
| compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, | ||
| }, | ||
| ) | ||
| .await?; | ||
| } | ||
| }, | ||
| } | ||
| } else { | ||
| // It's not in the artifacts, so we need to enqueue a job to prepare it. | ||
|
|
@@ -596,20 +667,26 @@ async fn handle_prepare_done( | |
| never!("the artifact is already prepared: {:?}", artifact_id); | ||
| return Ok(()) | ||
| }, | ||
| Some(ArtifactState::FailedToProcess(_)) => { | ||
| Some(ArtifactState::FailedToProcess { .. }) => { | ||
| // The reasoning is similar to the above, the artifact cannot be | ||
| // processed at this point. | ||
| never!("the artifact is already processed unsuccessfully: {:?}", artifact_id); | ||
| return Ok(()) | ||
| }, | ||
| Some(state @ ArtifactState::Preparing { waiting_for_response: _ }) => state, | ||
| Some(state @ ArtifactState::Preparing { .. }) => state, | ||
| }; | ||
|
|
||
| if let ArtifactState::Preparing { waiting_for_response } = state { | ||
| let num_failures = if let ArtifactState::Preparing { waiting_for_response, num_failures } = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: I wonder if it's time to move it under the match above? Then,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I gave it a try, but I wasn't able to work around the compiler errors when I do that. |
||
| state | ||
| { | ||
| for result_sender in waiting_for_response.drain(..) { | ||
| let _ = result_sender.send(result.clone()); | ||
| } | ||
| } | ||
| num_failures | ||
| } else { | ||
| never!("The reasoning is similar to the above, the artifact can only be preparing at this point; qed"); | ||
| return Ok(()) | ||
| }; | ||
|
|
||
| // It's finally time to dispatch all the execution requests that were waiting for this artifact | ||
| // to be prepared. | ||
|
|
@@ -641,7 +718,11 @@ async fn handle_prepare_done( | |
|
|
||
| *state = match result { | ||
| Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() }, | ||
| Err(error) => ArtifactState::FailedToProcess(error.clone()), | ||
| Err(error) => ArtifactState::FailedToProcess { | ||
| last_time_failed: SystemTime::now(), | ||
| num_failures: *num_failures + 1, | ||
| error: error.clone(), | ||
| }, | ||
| }; | ||
|
|
||
| Ok(()) | ||
|
|
@@ -704,6 +785,12 @@ async fn sweeper_task(mut sweeper_rx: mpsc::Receiver<PathBuf>) { | |
| } | ||
| } | ||
|
|
||
| /// Check if the conditions to retry a prepare job have been met. | ||
| fn can_retry_prepare_after_failure(last_time_failed: SystemTime, num_failures: u32) -> bool { | ||
| SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && | ||
| num_failures <= NUM_PREPARE_RETRIES | ||
| } | ||
|
|
||
| /// A stream that yields a pulse continuously at a given interval. | ||
| fn pulse_every(interval: std::time::Duration) -> impl futures::Stream<Item = ()> { | ||
| futures::stream::unfold(interval, { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: tautology. What is this error exactly? The last one I presume?