diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs index 3bc4537cfa74..35a2f701c881 100644 --- a/network/src/protocol/tests.rs +++ b/network/src/protocol/tests.rs @@ -24,7 +24,7 @@ use polkadot_primitives::v0::{ GlobalValidationData, LocalValidationData, SigningContext, PoVBlock, BlockData, ValidationCode, }; -use polkadot_validation::{SharedTable, TableRouter}; +use polkadot_validation::{SharedTable, TableRouter, pipeline::ExecutionMode}; use av_store::Store as AvailabilityStore; use sc_network_gossip::TopicNotification; @@ -298,7 +298,7 @@ fn consensus_instances_cleaned_up() { signing_context, AvailabilityStore::new_in_memory(service.clone()), None, - None, + ExecutionMode::InProcess, )); pool.spawner().spawn_local(worker_task).unwrap(); @@ -329,7 +329,7 @@ fn collation_is_received_with_dropped_router() { signing_context, AvailabilityStore::new_in_memory(service.clone()), None, - None, + ExecutionMode::InProcess, )); pool.spawner().spawn_local(worker_task).unwrap(); @@ -489,7 +489,7 @@ fn fetches_pov_block_from_gossip() { signing_context, AvailabilityStore::new_in_memory(service.clone()), None, - None, + ExecutionMode::InProcess, )); let spawner = pool.spawner(); diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 0a59ac3daf85..1d040cd60483 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -35,7 +35,7 @@ use polkadot_primitives::v1::{ }; use polkadot_parachain::wasm_executor::{ self, ValidationPool, ExecutionMode, ValidationError, - InvalidCandidate as WasmInvalidCandidate, ValidationExecutionMode, + InvalidCandidate as WasmInvalidCandidate, }; use polkadot_parachain::primitives::{ValidationResult as WasmValidationResult, ValidationParams}; @@ -75,7 +75,7 @@ async fn run( ) -> SubsystemResult<()> { - let pool = ValidationPool::new(ValidationExecutionMode::ExternalProcessSelfHost); + let execution_mode = ExecutionMode::ExternalProcessSelfHost(ValidationPool::new()); loop { match ctx.recv().await? { @@ -90,7 +90,7 @@ async fn run( ) => { let res = spawn_validate_from_chain_state( &mut ctx, - Some(pool.clone()), + execution_mode.clone(), descriptor, pov, spawn.clone(), @@ -110,7 +110,7 @@ async fn run( ) => { let res = spawn_validate_exhaustive( &mut ctx, - Some(pool.clone()), + execution_mode.clone(), omitted_validation, validation_code, descriptor, @@ -217,7 +217,7 @@ async fn check_assumption_validation_data( async fn spawn_validate_from_chain_state( ctx: &mut impl SubsystemContext, - validation_pool: Option, + execution_mode: ExecutionMode, descriptor: CandidateDescriptor, pov: Arc, spawn: impl SpawnNamed + 'static, @@ -258,7 +258,7 @@ async fn spawn_validate_from_chain_state( AssumptionCheckOutcome::Matches(omitted_validation, validation_code) => { return spawn_validate_exhaustive( ctx, - validation_pool, + execution_mode, omitted_validation, validation_code, descriptor, @@ -279,7 +279,7 @@ async fn spawn_validate_from_chain_state( AssumptionCheckOutcome::Matches(omitted_validation, validation_code) => { return spawn_validate_exhaustive( ctx, - validation_pool, + execution_mode, omitted_validation, validation_code, descriptor, @@ -299,7 +299,7 @@ async fn spawn_validate_from_chain_state( async fn spawn_validate_exhaustive( ctx: &mut impl SubsystemContext, - validation_pool: Option, + execution_mode: ExecutionMode, omitted_validation: OmittedValidationData, validation_code: ValidationCode, descriptor: CandidateDescriptor, @@ -309,7 +309,7 @@ async fn spawn_validate_exhaustive( let (tx, rx) = oneshot::channel(); let fut = async move { let res = validate_candidate_exhaustive::( - validation_pool, + execution_mode, omitted_validation, validation_code, descriptor, @@ -386,22 +386,18 @@ trait ValidationBackend { struct RealValidationBackend; impl ValidationBackend for RealValidationBackend { - type Arg = Option; + type Arg = ExecutionMode; fn validate( - pool: Option, + execution_mode: ExecutionMode, validation_code: &ValidationCode, params: ValidationParams, spawn: S, ) -> Result { - let execution_mode = pool.as_ref() - .map(ExecutionMode::Remote) - .unwrap_or(ExecutionMode::Local); - wasm_executor::validate_candidate( &validation_code.0, params, - execution_mode, + &execution_mode, spawn, ) } diff --git a/parachain/Cargo.toml b/parachain/Cargo.toml index abfda8010962..5ee0a05c699e 100644 --- a/parachain/Cargo.toml +++ b/parachain/Cargo.toml @@ -19,7 +19,7 @@ polkadot-core-primitives = { path = "../core-primitives", default-features = fal derive_more = { version = "0.99.2", optional = true } serde = { version = "1.0.102", default-features = false, features = [ "derive" ], optional = true } sp-externalities = { git = "https://github.com/paritytech/substrate", branch = "rococo-branch", optional = true } -sc-executor = { git = "https://github.com/paritytech/substrate", branch = "rococo-branch", optional = true } +sc-executor = { git = "https://github.com/paritytech/substrate", branch = "rococo-branch", optional = true, features = ["wasmtime"] } sp-io = { git = "https://github.com/paritytech/substrate", branch = "rococo-branch", optional = true } parking_lot = { version = "0.10.0", optional = true } log = { version = "0.4.8", optional = true } diff --git a/parachain/src/wasm_executor/mod.rs b/parachain/src/wasm_executor/mod.rs index d5890e2a878e..99b704129b87 100644 --- a/parachain/src/wasm_executor/mod.rs +++ b/parachain/src/wasm_executor/mod.rs @@ -20,7 +20,7 @@ //! Assuming the parameters are correct, this module provides a wrapper around //! a WASM VM for re-execution of a parachain candidate. -use std::any::{TypeId, Any}; +use std::{any::{TypeId, Any}, path::PathBuf}; use crate::primitives::{ValidationParams, ValidationResult}; use codec::{Decode, Encode}; use sp_core::{storage::ChildInfo, traits::{CallInWasm, SpawnNamed}}; @@ -28,7 +28,7 @@ use sp_externalities::Extensions; use sp_wasm_interface::HostFunctions as _; #[cfg(not(any(target_os = "android", target_os = "unknown")))] -pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode}; +pub use validation_host::{run_worker, ValidationPool, EXECUTION_TIMEOUT_SEC, WORKER_ARGS}; mod validation_host; @@ -58,16 +58,29 @@ pub fn run_worker(_: &str) -> Result<(), String> { Err("Cannot run validation worker on this platform".to_string()) } -/// WASM code execution mode. -/// -/// > Note: When compiling for WASM, the `Remote` variants are not available. -pub enum ExecutionMode<'a> { - /// Execute in-process. The execution can not be interrupted or aborted. - Local, - /// Remote execution in a spawned process. - Remote(&'a ValidationPool), +/// The execution mode for the `ValidationPool`. +#[derive(Clone)] +#[cfg_attr(not(any(target_os = "android", target_os = "unknown")), derive(Debug))] +pub enum ExecutionMode { + /// The validation worker is ran in a thread inside the same process. + InProcess, + /// The validation worker is ran using the process' executable and the subcommand `validation-worker` is passed + /// following by the address of the shared memory. + ExternalProcessSelfHost(ValidationPool), + /// The validation worker is ran using the command provided and the argument provided. The address of the shared + /// memory is added at the end of the arguments. + ExternalProcessCustomHost { + /// Validation pool. + pool: ValidationPool, + /// Path to the validation worker. The file must exists and be executable. + binary: PathBuf, + /// List of arguments passed to the validation worker. The address of the shared memory will be automatically + /// added after the arguments. + args: Vec, + }, } + #[derive(Debug, derive_more::Display, derive_more::From)] /// Candidate validation error. pub enum ValidationError { @@ -132,19 +145,24 @@ impl std::error::Error for ValidationError { pub fn validate_candidate( validation_code: &[u8], params: ValidationParams, - options: ExecutionMode<'_>, + execution_mode: &ExecutionMode, spawner: impl SpawnNamed + 'static, ) -> Result { - match options { - ExecutionMode::Local => { + match execution_mode { + ExecutionMode::InProcess => { validate_candidate_internal(validation_code, ¶ms.encode(), spawner) }, #[cfg(not(any(target_os = "android", target_os = "unknown")))] - ExecutionMode::Remote(pool) => { + ExecutionMode::ExternalProcessSelfHost(pool) => { pool.validate_candidate(validation_code, params) }, + #[cfg(not(any(target_os = "android", target_os = "unknown")))] + ExecutionMode::ExternalProcessCustomHost { pool, binary, args } => { + let args: Vec<&str> = args.iter().map(|x| x.as_str()).collect(); + pool.validate_candidate_custom(validation_code, params, binary, &args) + }, #[cfg(any(target_os = "android", target_os = "unknown"))] - ExecutionMode::Remote(_pool) => + ExecutionMode::ExternalProcessSelfHost(_) | ExecutionMode::ExternalProcessCustomHost { .. } => Err(ValidationError::Internal(InternalError::System( Box::::from( "Remote validator not available".to_string() diff --git a/parachain/src/wasm_executor/validation_host.rs b/parachain/src/wasm_executor/validation_host.rs index 249ad964670b..07367d3318fe 100644 --- a/parachain/src/wasm_executor/validation_host.rs +++ b/parachain/src/wasm_executor/validation_host.rs @@ -29,9 +29,9 @@ use log::{debug, trace}; use futures::executor::ThreadPool; use sp_core::traits::SpawnNamed; -/// CLI Argument to start in validation worker mode. const WORKER_ARG: &'static str = "validation-worker"; -const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; +/// CLI Argument to start in validation worker mode. +pub const WORKER_ARGS: &[&'static str] = &[WORKER_ARG]; /// Execution timeout in seconds; #[cfg(debug_assertions)] @@ -65,60 +65,60 @@ impl SpawnNamed for TaskExecutor { } } -/// The execution mode for the `ValidationPool`. -#[derive(Debug, Clone)] -pub enum ValidationExecutionMode { - /// The validation worker is ran in a thread inside the same process. - InProcess, - /// The validation worker is ran using the process' executable and the subcommand `validation-worker` is passed - /// following by the address of the shared memory. - ExternalProcessSelfHost, - /// The validation worker is ran using the command provided and the argument provided. The address of the shared - /// memory is added at the end of the arguments. - ExternalProcessCustomHost { - /// Path to the validation worker. The file must exists and be executable. - binary: PathBuf, - /// List of arguments passed to the validation worker. The address of the shared memory will be automatically - /// added after the arguments. - args: Vec, - }, -} - /// A pool of hosts. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ValidationPool { hosts: Arc>>, - execution_mode: ValidationExecutionMode, } const DEFAULT_NUM_HOSTS: usize = 8; impl ValidationPool { /// Creates a validation pool with the default configuration. - pub fn new(execution_mode: ValidationExecutionMode) -> ValidationPool { + pub fn new() -> ValidationPool { ValidationPool { hosts: Arc::new((0..DEFAULT_NUM_HOSTS).map(|_| Default::default()).collect()), - execution_mode, } } - /// Validate a candidate under the given validation code using the next - /// free validation host. + /// Validate a candidate under the given validation code using the next free validation host. /// /// This will fail if the validation code is not a proper parachain validation module. + /// + /// This function will use `std::env::current_exe()` with the default arguments [`WORKER_ARGS`] to run the worker. pub fn validate_candidate( &self, validation_code: &[u8], params: ValidationParams, + ) -> Result { + self.validate_candidate_custom( + validation_code, + params, + &env::current_exe().map_err(|err| ValidationError::Internal(err.into()))?, + WORKER_ARGS, + ) + } + + /// Validate a candidate under the given validation code using the next free validation host. + /// + /// This will fail if the validation code is not a proper parachain validation module. + /// + /// This function will use the command and the arguments provided in the function's arguments to run the worker. + pub fn validate_candidate_custom( + &self, + validation_code: &[u8], + params: ValidationParams, + command: &PathBuf, + args: &[&str], ) -> Result { for host in self.hosts.iter() { if let Some(mut host) = host.try_lock() { - return host.validate_candidate(validation_code, params, self.execution_mode.clone()); + return host.validate_candidate(validation_code, params, command, args) } } // all workers are busy, just wait for the first one - self.hosts[0].lock().validate_candidate(validation_code, params, self.execution_mode.clone()) + self.hosts[0].lock().validate_candidate(validation_code, params, command, args) } } @@ -224,11 +224,32 @@ enum ValidationResultHeader { unsafe impl Send for ValidationHost {} -#[derive(Default)] +struct ValidationHostMemory(SharedMem); + +impl std::fmt::Debug for ValidationHostMemory { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "ValidationHostMemory") + } +} + +impl std::ops::Deref for ValidationHostMemory { + type Target = SharedMem; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for ValidationHostMemory { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Default, Debug)] struct ValidationHost { worker: Option, - worker_thread: Option>>, - memory: Option, + memory: Option, id: u32, } @@ -253,7 +274,7 @@ impl ValidationHost { Ok(mem_config.create()?) } - fn start_worker(&mut self, execution_mode: ValidationExecutionMode) -> Result<(), InternalError> { + fn start_worker(&mut self, cmd: &PathBuf, args: &[&str]) -> Result<(), InternalError> { if let Some(ref mut worker) = self.worker { // Check if still alive if let Ok(None) = worker.try_wait() { @@ -261,44 +282,23 @@ impl ValidationHost { return Ok(()); } } - if self.worker_thread.is_some() { - return Ok(()); - } let memory = Self::create_memory()?; - let mut run_worker_process = |cmd: PathBuf, args: Vec| -> Result<(), std::io::Error> { - debug!("Starting worker at {:?} with arguments: {:?} and {:?}", cmd, args, memory.get_os_path()); - let worker = process::Command::new(cmd) - .args(args) - .arg(memory.get_os_path()) - .stdin(process::Stdio::piped()) - .spawn()?; - self.id = worker.id(); - self.worker = Some(worker); - Ok(()) - }; - - match execution_mode { - ValidationExecutionMode::InProcess => { - let mem_id = memory.get_os_path().to_string(); - self.worker_thread = Some(std::thread::spawn(move || run_worker(mem_id.as_str()))); - }, - ValidationExecutionMode::ExternalProcessSelfHost => run_worker_process( - env::current_exe()?, - WORKER_ARGS.iter().map(|x| x.to_string()).collect(), - )?, - ValidationExecutionMode::ExternalProcessCustomHost { binary, args } => run_worker_process( - binary, - args, - )?, - }; + debug!("Starting worker at {:?} with arguments: {:?} and {:?}", cmd, args, memory.get_os_path()); + let worker = process::Command::new(cmd) + .args(args) + .arg(memory.get_os_path()) + .stdin(process::Stdio::piped()) + .spawn()?; + self.id = worker.id(); + self.worker = Some(worker); memory.wait( Event::WorkerReady as usize, shared_memory::Timeout::Sec(EXECUTION_TIMEOUT_SEC as usize), )?; - self.memory = Some(memory); + self.memory = Some(ValidationHostMemory(memory)); Ok(()) } @@ -309,13 +309,14 @@ impl ValidationHost { &mut self, validation_code: &[u8], params: ValidationParams, - execution_mode: ValidationExecutionMode, + binary: &PathBuf, + args: &[&str], ) -> Result { if validation_code.len() > MAX_CODE_MEM { return Err(ValidationError::InvalidCandidate(InvalidCandidate::CodeTooLarge(validation_code.len()))); } // First, check if need to spawn the child process - self.start_worker(execution_mode)?; + self.start_worker(binary, args)?; let memory = self.memory.as_mut() .expect("memory is always `Some` after `start_worker` completes successfully"); { diff --git a/parachain/test-parachains/tests/adder/mod.rs b/parachain/test-parachains/tests/adder/mod.rs index 975cb8d5ae47..6df82b70350d 100644 --- a/parachain/test-parachains/tests/adder/mod.rs +++ b/parachain/test-parachains/tests/adder/mod.rs @@ -25,7 +25,7 @@ use parachain::{ HeadData as GenericHeadData, ValidationParams, }, - wasm_executor::{ValidationPool, ValidationExecutionMode} + wasm_executor::{ValidationPool, ExecutionMode} }; use codec::{Decode, Encode}; @@ -57,28 +57,27 @@ fn hash_head(head: &HeadData) -> [u8; 32] { tiny_keccak::keccak256(head.encode().as_slice()) } -fn validation_pool() -> ValidationPool { - let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost { +fn execution_mode() -> ExecutionMode { + ExecutionMode::ExternalProcessCustomHost { + pool: ValidationPool::new(), binary: std::env::current_exe().unwrap(), args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(), - }; - - ValidationPool::new(execution_mode) + } } #[test] fn execute_good_on_parent_with_inprocess_validation() { - let pool = ValidationPool::new(ValidationExecutionMode::InProcess); - execute_good_on_parent(pool); + let execution_mode = ExecutionMode::InProcess; + execute_good_on_parent(execution_mode); } #[test] pub fn execute_good_on_parent_with_external_process_validation() { - let pool = validation_pool(); - execute_good_on_parent(pool); + let execution_mode = execution_mode(); + execute_good_on_parent(execution_mode); } -fn execute_good_on_parent(pool: ValidationPool) { +fn execute_good_on_parent(execution_mode: ExecutionMode) { let parent_head = HeadData { number: 0, parent_hash: [0; 32], @@ -101,7 +100,7 @@ fn execute_good_on_parent(pool: ValidationPool) { relay_chain_height: 1, code_upgrade_allowed: None, }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap(); @@ -117,7 +116,7 @@ fn execute_good_chain_on_parent() { let mut number = 0; let mut parent_hash = [0; 32]; let mut last_state = 0; - let pool = validation_pool(); + let execution_mode = execution_mode(); for add in 0..10 { let parent_head = HeadData { @@ -141,7 +140,7 @@ fn execute_good_chain_on_parent() { relay_chain_height: number as RelayChainBlockNumber + 1, code_upgrade_allowed: None, }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap(); @@ -159,7 +158,7 @@ fn execute_good_chain_on_parent() { #[test] fn execute_bad_on_parent() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let parent_head = HeadData { number: 0, @@ -182,7 +181,7 @@ fn execute_bad_on_parent() { relay_chain_height: 1, code_upgrade_allowed: None, }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap_err(); } diff --git a/parachain/test-parachains/tests/code_upgrader/mod.rs b/parachain/test-parachains/tests/code_upgrader/mod.rs index e2ec3d9fede0..4e8f5bd8ce23 100644 --- a/parachain/test-parachains/tests/code_upgrader/mod.rs +++ b/parachain/test-parachains/tests/code_upgrader/mod.rs @@ -16,7 +16,7 @@ //! Basic parachain that adds a number as part of its state. -use parachain::wasm_executor::{ValidationPool, ValidationExecutionMode}; +use parachain::wasm_executor::{ValidationPool, ExecutionMode}; use parachain::primitives::{ BlockData as GenericBlockData, HeadData as GenericHeadData, @@ -27,18 +27,17 @@ use code_upgrader::{hash_state, HeadData, BlockData, State}; const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"]; -fn validation_pool() -> ValidationPool { - let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost { +fn execution_mode() -> ExecutionMode { + ExecutionMode::ExternalProcessCustomHost { + pool: ValidationPool::new(), binary: std::env::current_exe().unwrap(), args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(), - }; - - ValidationPool::new(execution_mode) + } } #[test] pub fn execute_good_no_upgrade() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let parent_head = HeadData { number: 0, @@ -61,7 +60,7 @@ pub fn execute_good_no_upgrade() { relay_chain_height: 1, code_upgrade_allowed: None, }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap(); @@ -75,7 +74,7 @@ pub fn execute_good_no_upgrade() { #[test] pub fn execute_good_with_upgrade() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let parent_head = HeadData { number: 0, @@ -98,7 +97,7 @@ pub fn execute_good_with_upgrade() { relay_chain_height: 1, code_upgrade_allowed: Some(20), }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap(); @@ -119,7 +118,7 @@ pub fn execute_good_with_upgrade() { #[test] #[should_panic] pub fn code_upgrade_not_allowed() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let parent_head = HeadData { number: 0, @@ -142,14 +141,14 @@ pub fn code_upgrade_not_allowed() { relay_chain_height: 1, code_upgrade_allowed: None, }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap(); } #[test] pub fn applies_code_upgrade_after_delay() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let (new_head, state) = { let parent_head = HeadData { @@ -173,7 +172,7 @@ pub fn applies_code_upgrade_after_delay() { relay_chain_height: 1, code_upgrade_allowed: Some(2), }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap(); @@ -209,7 +208,7 @@ pub fn applies_code_upgrade_after_delay() { relay_chain_height: 2, code_upgrade_allowed: None, }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).unwrap(); diff --git a/parachain/test-parachains/tests/wasm_executor/mod.rs b/parachain/test-parachains/tests/wasm_executor/mod.rs index ce17412bff13..ad3ceacd1881 100644 --- a/parachain/test-parachains/tests/wasm_executor/mod.rs +++ b/parachain/test-parachains/tests/wasm_executor/mod.rs @@ -21,21 +21,20 @@ const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"]; use crate::adder; use parachain::{ primitives::{BlockData, ValidationParams}, - wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC, ValidationExecutionMode, ValidationPool}, + wasm_executor::{ValidationError, InvalidCandidate, EXECUTION_TIMEOUT_SEC, ExecutionMode, ValidationPool}, }; -fn validation_pool() -> ValidationPool { - let execution_mode = ValidationExecutionMode::ExternalProcessCustomHost { +fn execution_mode() -> ExecutionMode { + ExecutionMode::ExternalProcessCustomHost { + pool: ValidationPool::new(), binary: std::env::current_exe().unwrap(), args: WORKER_ARGS_TEST.iter().map(|x| x.to_string()).collect(), - }; - - ValidationPool::new(execution_mode) + } } #[test] fn terminates_on_timeout() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let result = parachain::wasm_executor::validate_candidate( halt::wasm_binary_unwrap(), @@ -47,7 +46,7 @@ fn terminates_on_timeout() { relay_chain_height: 1, code_upgrade_allowed: None, }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode, sp_core::testing::TaskExecutor::new(), ); match result { @@ -61,11 +60,11 @@ fn terminates_on_timeout() { #[test] fn parallel_execution() { - let pool = validation_pool(); + let execution_mode = execution_mode(); let start = std::time::Instant::now(); - let pool2 = pool.clone(); + let execution_mode2 = execution_mode.clone(); let thread = std::thread::spawn(move || parachain::wasm_executor::validate_candidate( halt::wasm_binary_unwrap(), @@ -77,7 +76,7 @@ fn parallel_execution() { relay_chain_height: 1, code_upgrade_allowed: None, }, - parachain::wasm_executor::ExecutionMode::Remote(&pool2), + &execution_mode, sp_core::testing::TaskExecutor::new(), ).ok()); let _ = parachain::wasm_executor::validate_candidate( @@ -90,7 +89,7 @@ fn parallel_execution() { relay_chain_height: 1, code_upgrade_allowed: None, }, - parachain::wasm_executor::ExecutionMode::Remote(&pool), + &execution_mode2, sp_core::testing::TaskExecutor::new(), ); thread.join().unwrap(); diff --git a/service/src/lib.rs b/service/src/lib.rs index 40c9a54a4a87..b39722101e48 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -31,8 +31,6 @@ use sc_executor::native_executor_instance; use log::info; use sp_trie::PrefixedMemoryDB; use prometheus_endpoint::Registry; -#[cfg(feature = "full-node")] -use consensus::pipeline::ValidationExecutionMode; pub use service::{ Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis, RpcHandlers, TFullClient, TLightClient, TFullBackend, TLightBackend, TFullCallExecutor, TLightCallExecutor, @@ -413,11 +411,7 @@ pub fn new_full( select_chain: select_chain.clone(), keystore: keystore.clone(), max_block_data_size, - validation_execution_mode: if test { - ValidationExecutionMode::InProcess - } else { - ValidationExecutionMode::ExternalProcessSelfHost - }, + local_validation: test, }.build(); task_manager.spawn_essential_handle().spawn("validation-service", Box::pin(validation_service)); diff --git a/validation/src/collation.rs b/validation/src/collation.rs index 53e18ab3abb5..75d54c5fd119 100644 --- a/validation/src/collation.rs +++ b/validation/src/collation.rs @@ -58,7 +58,7 @@ pub trait Collators: Clone { /// A future which resolves when a collation is available. pub async fn collation_fetch( - validation_pool: Option, + execution_mode: crate::pipeline::ExecutionMode, parachain: ParaId, relay_parent: Hash, collators: C, @@ -77,7 +77,7 @@ pub async fn collation_fetch( let collation = collators.collate(parachain, relay_parent).await?; let Collation { info, pov } = collation; let res = crate::pipeline::full_output_validation_with_api( - validation_pool.as_ref(), + &execution_mode, &*client, &info, &pov, diff --git a/validation/src/pipeline.rs b/validation/src/pipeline.rs index 51a31ec3070a..2a818ec10dfd 100644 --- a/validation/src/pipeline.rs +++ b/validation/src/pipeline.rs @@ -26,7 +26,7 @@ use polkadot_primitives::v0::{ }; use polkadot_primitives::v0::{Block, BlockId, Balance, Hash}; use parachain::{ - wasm_executor::{self, ExecutionMode}, + wasm_executor, primitives::{UpwardMessage, ValidationParams}, }; use runtime_primitives::traits::{BlakeTwo256, Hash as HashT}; @@ -34,7 +34,7 @@ use sp_api::ProvideRuntimeApi; use crate::Error; use primitives::traits::SpawnNamed; -pub use parachain::wasm_executor::{ValidationPool, ValidationExecutionMode}; +pub use parachain::wasm_executor::{ExecutionMode, ValidationPool}; /// Does basic checks of a collation. Provide the encoded PoV-block. pub fn basic_checks( @@ -188,7 +188,7 @@ fn validate_upward_messages( /// Does full checks of a collation, with provided PoV-block and contextual data. pub fn validate<'a>( - validation_pool: Option<&'_ ValidationPool>, + execution_mode: &'a ExecutionMode, collation: &'a CollationInfo, pov_block: &'a PoVBlock, local_validation: &'a LocalValidationData, @@ -218,10 +218,6 @@ pub fn validate<'a>( per_byte: 0, }; - let execution_mode = validation_pool - .map(ExecutionMode::Remote) - .unwrap_or(ExecutionMode::Local); - match wasm_executor::validate_candidate( &validation_code.0, params, @@ -276,7 +272,7 @@ where /// Does full-pipeline validation of a collation with provided contextual parameters. pub fn full_output_validation_with_api

( - validation_pool: Option<&ValidationPool>, + execution_mode: &ExecutionMode, api: &P, collation: &CollationInfo, pov_block: &PoVBlock, @@ -303,7 +299,7 @@ pub fn full_output_validation_with_api

( ) .and_then(|()| { let res = validate( - validation_pool, + execution_mode, &collation, &pov_block, &local_validation, diff --git a/validation/src/shared_table/mod.rs b/validation/src/shared_table/mod.rs index 383306ce28f5..bb0e66286ee9 100644 --- a/validation/src/shared_table/mod.rs +++ b/validation/src/shared_table/mod.rs @@ -40,7 +40,7 @@ use self::includable::IncludabilitySender; use primitives::{Pair, traits::SpawnNamed}; use sp_api::ProvideRuntimeApi; -use crate::pipeline::{FullOutput, ValidationPool}; +use crate::pipeline::{ExecutionMode, FullOutput}; use crate::Error; mod includable; @@ -147,7 +147,7 @@ struct SharedTableInner { trackers: Vec, availability_store: AvailabilityStore, validated: HashMap, - validation_pool: Option, + execution_mode: ExecutionMode, } impl SharedTableInner { @@ -209,7 +209,7 @@ impl SharedTableInner { }; work.map(|work| ParachainWork { - validation_pool: self.validation_pool.clone(), + execution_mode: self.execution_mode.clone(), availability_store: self.availability_store.clone(), relay_parent: context.signing_context.parent_hash.clone(), work, @@ -276,7 +276,7 @@ impl Validated { /// Future that performs parachain validation work. pub struct ParachainWork { - validation_pool: Option, + execution_mode: ExecutionMode, work: Work, relay_parent: Hash, availability_store: AvailabilityStore, @@ -304,11 +304,11 @@ impl ParachainWork { let n_validators = self.n_validators; let expected_relay_parent = self.relay_parent; - let pool = self.validation_pool.clone(); + let execution_mode = self.execution_mode.clone(); let validate = move |pov_block: &PoVBlock, candidate: &AbridgedCandidateReceipt| { let collation_info = candidate.to_collation_info(); let full_output = crate::pipeline::full_output_validation_with_api( - pool.as_ref(), + &execution_mode, &*api, &collation_info, pov_block, @@ -440,7 +440,7 @@ impl SharedTable { signing_context: SigningContext, availability_store: AvailabilityStore, max_block_data_size: Option, - validation_pool: Option, + execution_mode: ExecutionMode, ) -> Self { SharedTable { context: Arc::new(TableContext { groups, key, signing_context, validators: validators.clone(), }), @@ -450,7 +450,7 @@ impl SharedTable { validated: HashMap::new(), trackers: Vec::new(), availability_store, - validation_pool, + execution_mode, })) } } @@ -711,7 +711,7 @@ mod tests { signing_context.clone(), AvailabilityStore::new_in_memory(DummyErasureNetworking), None, - None, + ExecutionMode::InProcess, ); let mut candidate = AbridgedCandidateReceipt::default(); @@ -768,7 +768,7 @@ mod tests { signing_context.clone(), AvailabilityStore::new_in_memory(DummyErasureNetworking), None, - None, + ExecutionMode::InProcess, ); let mut candidate = AbridgedCandidateReceipt::default(); @@ -826,7 +826,7 @@ mod tests { availability_store: store.clone(), max_block_data_size: None, n_validators, - validation_pool: None, + execution_mode: ExecutionMode::InProcess, }; for i in 0..n_validators { @@ -896,7 +896,7 @@ mod tests { availability_store: store.clone(), max_block_data_size: None, n_validators, - validation_pool: None, + execution_mode: ExecutionMode::InProcess, }; let validated = block_on(producer.prime_with(|_, _| Ok( @@ -951,7 +951,7 @@ mod tests { signing_context.clone(), AvailabilityStore::new_in_memory(DummyErasureNetworking), None, - None, + ExecutionMode::InProcess, ); let mut candidate = AbridgedCandidateReceipt::default(); @@ -1019,7 +1019,7 @@ mod tests { signing_context.clone(), AvailabilityStore::new_in_memory(DummyErasureNetworking), None, - None, + ExecutionMode::InProcess, ); let mut candidate = AbridgedCandidateReceipt::default(); diff --git a/validation/src/validation_service/mod.rs b/validation/src/validation_service/mod.rs index e06d6a222a6a..410ceb1e728c 100644 --- a/validation/src/validation_service/mod.rs +++ b/validation/src/validation_service/mod.rs @@ -48,7 +48,7 @@ use log::{warn, info, debug, trace}; use super::{Network, Collators, SharedTable, TableRouter}; use crate::Error; -use crate::pipeline::{ValidationPool, ValidationExecutionMode}; +use crate::pipeline::{ExecutionMode, ValidationPool}; // Remote processes may request for a validation instance to be cloned or instantiated. // They send a oneshot channel. @@ -132,8 +132,8 @@ pub struct ServiceBuilder { pub keystore: KeyStorePtr, /// The maximum block-data size in bytes. pub max_block_data_size: Option, - /// Validation execution mode. - pub validation_execution_mode: ValidationExecutionMode, + /// The validation is ran in the same process. + pub local_validation: bool, } impl ServiceBuilder where @@ -165,17 +165,21 @@ impl ServiceBuilder where NotifyImport(sc_client_api::BlockImportNotification), } - let validation_pool = Some(ValidationPool::new(self.validation_execution_mode)); + let execution_mode = if self.local_validation { + ExecutionMode::InProcess + } else { + ExecutionMode::ExternalProcessSelfHost(ValidationPool::new()) + }; let mut parachain_validation = ParachainValidationInstances { client: self.client.clone(), network: self.network, spawner: self.spawner.clone(), availability_store: self.availability_store, live_instances: HashMap::new(), - validation_pool: validation_pool.clone(), + execution_mode: execution_mode.clone(), collation_fetch: DefaultCollationFetch { collators: self.collators, - validation_pool, + execution_mode, spawner: self.spawner, }, }; @@ -261,7 +265,7 @@ pub(crate) trait CollationFetch { #[derive(Clone)] struct DefaultCollationFetch { collators: C, - validation_pool: Option, + execution_mode: ExecutionMode, spawner: S, } @@ -286,7 +290,7 @@ impl CollationFetch for DefaultCollationFetch P: ProvideRuntimeApi + Send + Sync + 'static, { crate::collation::collation_fetch( - self.validation_pool, + self.execution_mode, parachain, relay_parent, self.collators, @@ -331,9 +335,8 @@ pub(crate) struct ParachainValidationInstances { /// Live agreements. Maps relay chain parent hashes to attestation /// instances. live_instances: HashMap>, - /// The underlying validation pool of processes to use. - /// Only `None` in tests. - validation_pool: Option, + /// The underlying validation execution mode. + execution_mode: ExecutionMode, /// Used to fetch a collation. collation_fetch: CF, } @@ -439,7 +442,7 @@ impl ParachainValidationInstances where signing_context, self.availability_store.clone(), max_block_data_size, - self.validation_pool.clone(), + self.execution_mode.clone(), )); // The router will join the consensus gossip network. This is important @@ -746,7 +749,7 @@ mod tests { spawner: executor.clone(), availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), live_instances: HashMap::new(), - validation_pool: None, + execution_mode: ExecutionMode::InProcess, }; executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None)) @@ -786,7 +789,7 @@ mod tests { spawner: executor.clone(), availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), live_instances: HashMap::new(), - validation_pool: None, + execution_mode: ExecutionMode::InProcess, }; executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None))