From 2e3983090ded011a7fdf958a88954669ec36c041 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Tue, 1 Nov 2022 22:31:13 +0100 Subject: [PATCH 1/5] Fix a couple of typos --- node/core/pvf/src/execute/queue.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index 9b240e02df17..b4c6a66b7719 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -252,8 +252,8 @@ fn handle_job_finish( "execute worker concluded", ); - // First we send the result. It may fail due the other end of the channel being dropped, that's - // legitimate and we don't treat that as an error. + // First we send the result. It may fail due to the other end of the channel being dropped, + // that's legitimate and we don't treat that as an error. let _ = result_tx.send(result); // Then, we should deal with the worker: @@ -305,7 +305,7 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu Err(err) => { gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err); - // Assume that the failure intermittent and retry after a delay. + // Assume that the failure is intermittent and retry after a delay. Delay::new(Duration::from_secs(3)).await; }, } From 2e91ff3b1b17f375a00dcddf83b2dfedf75b3c13 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 3 Nov 2022 17:26:47 +0100 Subject: [PATCH 2/5] Retry failed PVF execution PVF execution that fails due to AmbiguousWorkerDeath should be retried once. This should reduce the occurrence of failures due to transient conditions. Closes #6195 --- node/core/candidate-validation/src/lib.rs | 58 +++++++++++++++-------- 1 file changed, 38 insertions(+), 20 deletions(-) diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index c3775ba1c453..a6ffc76c0191 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -38,6 +38,7 @@ use polkadot_node_subsystem::{ overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult, SubsystemSender, }; +use polkadot_node_subsystem_util::TimeoutExt; use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult}; use polkadot_primitives::v2::{ CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash, OccupiedCoreAssumption, @@ -60,6 +61,9 @@ mod tests; const LOG_TARGET: &'static str = "parachain::candidate-validation"; +// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error. +const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(1); + /// Configuration for the candidate validation subsystem #[derive(Clone)] pub struct Config { @@ -621,28 +625,21 @@ impl ValidationBackend for ValidationHost { timeout: Duration, params: ValidationParams, ) -> Result { - let (tx, rx) = oneshot::channel(); - if let Err(err) = self - .execute_pvf( - Pvf::from_code(raw_validation_code), - timeout, - params.encode(), - polkadot_node_core_pvf::Priority::Normal, - tx, - ) - .await - { - return Err(ValidationError::InternalError(format!( - "cannot send pvf to the validation host: {:?}", - err - ))) - } + let pvf = Pvf::from_code(raw_validation_code); - let validation_result = rx - .await - .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?; + let validation_result = execute_pvf_once(self, pvf.clone(), timeout, params.encode()).await; - validation_result + // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the + // assumption that the conditions that caused this error may have been transient. + if let Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)) = + validation_result + { + // Wait a brief delay before retrying. + let _: Option<()> = future::pending().timeout(PVF_EXECUTION_RETRY_DELAY).await; + execute_pvf_once(self, pvf, timeout, params.encode()).await + } else { + validation_result + } } async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> { @@ -657,6 +654,27 @@ impl ValidationBackend for ValidationHost { } } +// Tries executing a PVF a single time (no retries). +async fn execute_pvf_once( + host: &mut ValidationHost, + pvf: Pvf, + timeout: Duration, + params: Vec, +) -> Result { + let priority = polkadot_node_core_pvf::Priority::Normal; + + let (tx, rx) = oneshot::channel(); + if let Err(err) = host.execute_pvf(pvf, timeout, params, priority, tx).await { + return Err(ValidationError::InternalError(format!( + "cannot send pvf to the validation host: {:?}", + err + ))) + } + + rx.await + .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? +} + /// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks /// are passed, `Err` otherwise. fn perform_basic_checks( From e2a15ee8358beb616548adf3727203ae52944d17 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 7 Nov 2022 09:42:43 -0500 Subject: [PATCH 3/5] Address a couple of nits --- Cargo.lock | 1 + node/core/candidate-validation/Cargo.toml | 1 + node/core/candidate-validation/src/lib.rs | 8 +++++--- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3a07d8afeae..857846e719b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6397,6 +6397,7 @@ dependencies = [ "assert_matches", "async-trait", "futures", + "futures-timer", "parity-scale-codec", "polkadot-node-core-pvf", "polkadot-node-primitives", diff --git a/node/core/candidate-validation/Cargo.toml b/node/core/candidate-validation/Cargo.toml index 8634cfe5a75e..105d7c1a21dc 100644 --- a/node/core/candidate-validation/Cargo.toml +++ b/node/core/candidate-validation/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] async-trait = "0.1.57" futures = "0.3.21" +futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../../gum" } sp-maybe-compressed-blob = { package = "sp-maybe-compressed-blob", git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index a6ffc76c0191..df21394b9d7e 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -38,7 +38,6 @@ use polkadot_node_subsystem::{ overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult, SubsystemSender, }; -use polkadot_node_subsystem_util::TimeoutExt; use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult}; use polkadot_primitives::v2::{ CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash, OccupiedCoreAssumption, @@ -61,8 +60,11 @@ mod tests; const LOG_TARGET: &'static str = "parachain::candidate-validation"; -// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error. +/// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error. +#[cfg(not(test))] const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(1); +#[cfg(test)] +const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200); /// Configuration for the candidate validation subsystem #[derive(Clone)] @@ -635,7 +637,7 @@ impl ValidationBackend for ValidationHost { validation_result { // Wait a brief delay before retrying. - let _: Option<()> = future::pending().timeout(PVF_EXECUTION_RETRY_DELAY).await; + futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await; execute_pvf_once(self, pvf, timeout, params.encode()).await } else { validation_result From e4775727d756c8d95410a87131d1ecc7c4744f30 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 7 Nov 2022 11:21:14 -0500 Subject: [PATCH 4/5] Write tests; refactor (add `validate_candidate_with_retry`) --- node/core/candidate-validation/src/lib.rs | 70 +++++----- node/core/candidate-validation/src/tests.rs | 144 ++++++++++++++++++-- 2 files changed, 173 insertions(+), 41 deletions(-) diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index df21394b9d7e..80f2916f3339 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -496,7 +496,7 @@ where } async fn validate_candidate_exhaustive( - mut validation_backend: impl ValidationBackend, + mut validation_backend: impl ValidationBackend + Send, persisted_validation_data: PersistedValidationData, validation_code: ValidationCode, candidate_receipt: CandidateReceipt, @@ -557,7 +557,7 @@ async fn validate_candidate_exhaustive( }; let result = validation_backend - .validate_candidate(raw_validation_code.to_vec(), timeout, params) + .validate_candidate_with_retry(raw_validation_code.to_vec(), timeout, params) .await; if let Err(ref error) = result { @@ -611,25 +611,22 @@ async fn validate_candidate_exhaustive( trait ValidationBackend { async fn validate_candidate( &mut self, - raw_validation_code: Vec, + pvf: Pvf, timeout: Duration, - params: ValidationParams, + encoded_params: Vec, ) -> Result; - async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>; -} - -#[async_trait] -impl ValidationBackend for ValidationHost { - async fn validate_candidate( + async fn validate_candidate_with_retry( &mut self, raw_validation_code: Vec, timeout: Duration, params: ValidationParams, ) -> Result { + // Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap. let pvf = Pvf::from_code(raw_validation_code); - let validation_result = execute_pvf_once(self, pvf.clone(), timeout, params.encode()).await; + let validation_result = + self.validate_candidate(pvf.clone(), timeout, params.encode()).await; // If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the // assumption that the conditions that caused this error may have been transient. @@ -638,12 +635,40 @@ impl ValidationBackend for ValidationHost { { // Wait a brief delay before retrying. futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await; - execute_pvf_once(self, pvf, timeout, params.encode()).await + // Encode the params again when re-trying. We expect the retry case to be relatively + // rare, and we want to avoid unconditionally cloning data. + self.validate_candidate(pvf, timeout, params.encode()).await } else { validation_result } } + async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError>; +} + +#[async_trait] +impl ValidationBackend for ValidationHost { + /// Tries executing a PVF a single time (no retries). + async fn validate_candidate( + &mut self, + pvf: Pvf, + timeout: Duration, + encoded_params: Vec, + ) -> Result { + let priority = polkadot_node_core_pvf::Priority::Normal; + + let (tx, rx) = oneshot::channel(); + if let Err(err) = self.execute_pvf(pvf, timeout, encoded_params, priority, tx).await { + return Err(ValidationError::InternalError(format!( + "cannot send pvf to the validation host: {:?}", + err + ))) + } + + rx.await + .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? + } + async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<(), PrepareError> { let (tx, rx) = oneshot::channel(); if let Err(_) = self.precheck_pvf(pvf, tx).await { @@ -656,27 +681,6 @@ impl ValidationBackend for ValidationHost { } } -// Tries executing a PVF a single time (no retries). -async fn execute_pvf_once( - host: &mut ValidationHost, - pvf: Pvf, - timeout: Duration, - params: Vec, -) -> Result { - let priority = polkadot_node_core_pvf::Priority::Normal; - - let (tx, rx) = oneshot::channel(); - if let Err(err) = host.execute_pvf(pvf, timeout, params, priority, tx).await { - return Err(ValidationError::InternalError(format!( - "cannot send pvf to the validation host: {:?}", - err - ))) - } - - rx.await - .map_err(|_| ValidationError::InternalError("validation was cancelled".into()))? -} - /// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks /// are passed, `Err` otherwise. fn perform_basic_checks( diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index ecac13d1440d..cf467cd5c057 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -345,12 +345,19 @@ fn check_does_not_match() { } struct MockValidateCandidateBackend { - result: Result, + result_list: Vec>, + num_times_called: usize, } impl MockValidateCandidateBackend { fn with_hardcoded_result(result: Result) -> Self { - Self { result } + Self { result_list: vec![result], num_times_called: 0 } + } + + fn with_hardcoded_result_list( + result_list: Vec>, + ) -> Self { + Self { result_list, num_times_called: 0 } } } @@ -358,11 +365,16 @@ impl MockValidateCandidateBackend { impl ValidationBackend for MockValidateCandidateBackend { async fn validate_candidate( &mut self, - _raw_validation_code: Vec, + _pvf: Pvf, _timeout: Duration, - _params: ValidationParams, + _encoded_params: Vec, ) -> Result { - self.result.clone() + // This is expected to panic if called more times than expected, indicating an error in the + // test. + let result = self.result_list[self.num_times_called].clone(); + self.num_times_called += 1; + + result } async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result<(), PrepareError> { @@ -468,7 +480,7 @@ fn candidate_validation_bad_return_is_invalid() { let v = executor::block_on(validate_candidate_exhaustive( MockValidateCandidateBackend::with_hardcoded_result(Err( - ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath), + ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout), )), validation_data, validation_code, @@ -479,6 +491,122 @@ fn candidate_validation_bad_return_is_invalid() { )) .unwrap(); + assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::Timeout)); +} + +#[test] +fn candidate_validation_one_ambiguous_error_is_valid() { + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; + + let pov = PoV { block_data: BlockData(vec![1; 32]) }; + let head_data = HeadData(vec![1, 1, 1]); + let validation_code = ValidationCode(vec![2; 16]); + + let descriptor = make_valid_candidate_descriptor( + ParaId::from(1_u32), + dummy_hash(), + validation_data.hash(), + pov.hash(), + validation_code.hash(), + head_data.hash(), + dummy_hash(), + Sr25519Keyring::Alice, + ); + + let check = perform_basic_checks( + &descriptor, + validation_data.max_pov_size, + &pov, + &validation_code.hash(), + ); + assert!(check.is_ok()); + + let validation_result = WasmValidationResult { + head_data, + new_validation_code: Some(vec![2, 2, 2].into()), + upward_messages: Vec::new(), + horizontal_messages: Vec::new(), + processed_downward_messages: 0, + hrmp_watermark: 0, + }; + + let commitments = CandidateCommitments { + head_data: validation_result.head_data.clone(), + upward_messages: validation_result.upward_messages.clone(), + horizontal_messages: validation_result.horizontal_messages.clone(), + new_validation_code: validation_result.new_validation_code.clone(), + processed_downward_messages: validation_result.processed_downward_messages, + hrmp_watermark: validation_result.hrmp_watermark, + }; + + let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() }; + + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + Ok(validation_result), + ]), + validation_data.clone(), + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &Default::default(), + )) + .unwrap(); + + assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => { + assert_eq!(outputs.head_data, HeadData(vec![1, 1, 1])); + assert_eq!(outputs.upward_messages, Vec::::new()); + assert_eq!(outputs.horizontal_messages, Vec::new()); + assert_eq!(outputs.new_validation_code, Some(vec![2, 2, 2].into())); + assert_eq!(outputs.hrmp_watermark, 0); + assert_eq!(used_validation_data, validation_data); + }); +} + +#[test] +fn candidate_validation_multiple_ambiguous_errors_is_invalid() { + let validation_data = PersistedValidationData { max_pov_size: 1024, ..Default::default() }; + + let pov = PoV { block_data: BlockData(vec![1; 32]) }; + let validation_code = ValidationCode(vec![2; 16]); + + let descriptor = make_valid_candidate_descriptor( + ParaId::from(1_u32), + dummy_hash(), + validation_data.hash(), + pov.hash(), + validation_code.hash(), + dummy_hash(), + dummy_hash(), + Sr25519Keyring::Alice, + ); + + let check = perform_basic_checks( + &descriptor, + validation_data.max_pov_size, + &pov, + &validation_code.hash(), + ); + assert!(check.is_ok()); + + let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() }; + + let v = executor::block_on(validate_candidate_exhaustive( + MockValidateCandidateBackend::with_hardcoded_result_list(vec![ + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)), + ]), + validation_data, + validation_code, + candidate_receipt, + Arc::new(pov), + Duration::from_secs(0), + &Default::default(), + )) + .unwrap(); + assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_))); } @@ -779,9 +907,9 @@ impl MockPreCheckBackend { impl ValidationBackend for MockPreCheckBackend { async fn validate_candidate( &mut self, - _raw_validation_code: Vec, + _pvf: Pvf, _timeout: Duration, - _params: ValidationParams, + _encoded_params: Vec, ) -> Result { unreachable!() } From 705adc74bccf4f26f50971d1d4b064c7a57a55dc Mon Sep 17 00:00:00 2001 From: eskimor Date: Tue, 8 Nov 2022 15:19:11 +0100 Subject: [PATCH 5/5] Update node/core/candidate-validation/src/lib.rs Co-authored-by: Andronik --- node/core/candidate-validation/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 80f2916f3339..a82a0feb78a0 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -62,7 +62,7 @@ const LOG_TARGET: &'static str = "parachain::candidate-validation"; /// The amount of time to wait before retrying after an AmbiguousWorkerDeath validation error. #[cfg(not(test))] -const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(1); +const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3); #[cfg(test)] const PVF_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);