diff --git a/Cargo.lock b/Cargo.lock index ad7f7441dad5..8424e764eca6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1344,12 +1344,12 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.5" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" +checksum = "8ff1f980957787286a554052d03c7aee98d99cc32e09f6d45f0a814133c87978" dependencies = [ "cfg-if 1.0.0", - "lazy_static", + "once_cell", ] [[package]] @@ -6764,6 +6764,7 @@ dependencies = [ "polkadot-node-subsystem-util", "polkadot-parachain", "rand 0.8.5", + "rayon", "sc-executor", "sc-executor-common", "sc-executor-wasmtime", diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index 46f35aa498cc..3e72a3106f52 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -20,6 +20,7 @@ gum = { package = "tracing-gum", path = "../../gum" } pin-project = "1.0.9" rand = "0.8.5" tempfile = "3.3.0" +rayon = "1.5.1" parity-scale-codec = { version = "3.1.2", default-features = false, features = ["derive"] } polkadot-parachain = { path = "../../../parachain" } polkadot-core-primitives = { path = "../../../core-primitives" } diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index 6b52a3f1369a..4aca2da4b3ba 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . use parity_scale_codec::{Decode, Encode}; +use std::any::Any; /// Result of PVF preparation performed by the validation host. pub type PrepareResult = Result<(), PrepareError>; @@ -108,3 +109,17 @@ impl From for ValidationError { } } } + +/// Attempt to convert an opaque panic payload to a string. +/// +/// This is a best effort, and is not guaranteed to provide the most accurate value. +pub(crate) fn stringify_panic_payload(payload: Box) -> String { + match payload.downcast::<&'static str>() { + Ok(msg) => msg.to_string(), + Err(payload) => match payload.downcast::() { + Ok(msg) => *msg, + // At least we tried... + Err(_) => "unknown panic payload".to_string(), + }, + } +} diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 01bc59cb1a84..a0b8337ddc4a 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -16,7 +16,7 @@ use crate::{ artifacts::ArtifactPathId, - executor_intf::TaskExecutor, + executor_intf::Executor, worker_common::{ bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -184,8 +184,8 @@ impl Response { /// the path to the socket used to communicate with the host. pub fn worker_entrypoint(socket_path: &str) { worker_event_loop("execute", socket_path, |mut stream| async move { - let executor = TaskExecutor::new().map_err(|e| { - io::Error::new(io::ErrorKind::Other, format!("cannot create task executor: {}", e)) + let executor = Executor::new().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e)) })?; loop { let (artifact_path, params) = recv_request(&mut stream).await?; @@ -204,14 +204,14 @@ pub fn worker_entrypoint(socket_path: &str) { async fn validate_using_artifact( artifact_path: &Path, params: &[u8], - spawner: &TaskExecutor, + executor: &Executor, ) -> Response { let validation_started_at = Instant::now(); let descriptor_bytes = match unsafe { // SAFETY: this should be safe since the compiled artifact passed here comes from the // file created by the prepare workers. These files are obtained by calling // [`executor_intf::prepare`]. - crate::executor_intf::execute(artifact_path.as_ref(), params, spawner.clone()) + executor.execute(artifact_path.as_ref(), params) } { Err(err) => return Response::format_invalid("execute", &err.to_string()), Ok(d) => d, diff --git a/node/core/pvf/src/executor_intf.rs b/node/core/pvf/src/executor_intf.rs index 7702923d7918..55e94922ed26 100644 --- a/node/core/pvf/src/executor_intf.rs +++ b/node/core/pvf/src/executor_intf.rs @@ -43,6 +43,9 @@ use std::{ const DEFAULT_HEAP_PAGES_ESTIMATE: u64 = 32; const EXTRA_HEAP_PAGES: u64 = 2048; +/// The number of bytes devoted for the stack during wasm execution of a PVF. +const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024; + const CONFIG: Config = Config { allow_missing_func_imports: true, cache_path: None, @@ -69,7 +72,7 @@ const CONFIG: Config = Config { // the stack limit set by the wasmtime. deterministic_stack_limit: Some(DeterministicStackLimit { logical_max: 65536, - native_stack_max: 256 * 1024 * 1024, + native_stack_max: NATIVE_STACK_MAX, }), canonicalize_nans: true, // Rationale for turning the multi-threaded compilation off is to make the preparation time @@ -98,20 +101,99 @@ pub fn prepare(blob: RuntimeBlob) -> Result, sc_executor_common::error:: sc_executor_wasmtime::prepare_runtime_artifact(blob, &CONFIG.semantics) } -/// Executes the given PVF in the form of a compiled artifact and returns the result of execution -/// upon success. -/// -/// # Safety -/// -/// The caller must ensure that the compiled artifact passed here was: -/// 1) produced by [`prepare`], -/// 2) written to the disk as a file, -/// 3) was not modified, -/// 4) will not be modified while any runtime using this artifact is alive, or is being -/// instantiated. -/// -/// Failure to adhere to these requirements might lead to crashes and arbitrary code execution. -pub unsafe fn execute( +pub struct Executor { + thread_pool: rayon::ThreadPool, + spawner: TaskSpawner, +} + +impl Executor { + pub fn new() -> Result { + // Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code. + // That native code does not create any stacks and just reuses the stack of the thread that + // wasmtime was invoked from. + // + // Also, we configure the executor to provide the deterministic stack and that requires + // supplying the amount of the native stack space that wasm is allowed to use. This is + // realized by supplying the limit into `wasmtime::Config::max_wasm_stack`. + // + // There are quirks to that configuration knob: + // + // 1. It only limits the amount of stack space consumed by wasm but does not ensure nor check + // that the stack space is actually available. + // + // That means, if the calling thread has 1 MiB of stack space left and the wasm code consumes + // more, then the wasmtime limit will **not** trigger. Instead, the wasm code will hit the + // guard page and the Rust stack overflow handler will be triggered. That leads to an + // **abort**. + // + // 2. It cannot and does not limit the stack space consumed by Rust code. + // + // Meaning that if the wasm code leaves no stack space for Rust code, then the Rust code + // and that will abort the process as well. + // + // Typically on Linux the main thread gets the stack size specified by the `ulimit` and + // typically it's configured to 8 MiB. Rust's spawned threads are 2 MiB. OTOH, the + // NATIVE_STACK_MAX is set to 256 MiB. Not nearly enough. + // + // Hence we need to increase it. + // + // The simplest way to fix that is to spawn a thread with the desired stack limit. In order + // to avoid costs of creating a thread, we use a thread pool. The execution is + // single-threaded hence the thread pool has only one thread. + // + // The reasoning why we pick this particular size is: + // + // The default Rust thread stack limit 2 MiB + 256 MiB wasm stack. + let thread_stack_size = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize; + let thread_pool = rayon::ThreadPoolBuilder::new() + .num_threads(1) + .stack_size(thread_stack_size) + .build() + .map_err(|e| format!("Failed to create thread pool: {:?}", e))?; + + let spawner = + TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?; + + Ok(Self { thread_pool, spawner }) + } + + /// Executes the given PVF in the form of a compiled artifact and returns the result of execution + /// upon success. + /// + /// # Safety + /// + /// The caller must ensure that the compiled artifact passed here was: + /// 1) produced by [`prepare`], + /// 2) written to the disk as a file, + /// 3) was not modified, + /// 4) will not be modified while any runtime using this artifact is alive, or is being + /// instantiated. + /// + /// Failure to adhere to these requirements might lead to crashes and arbitrary code execution. + pub unsafe fn execute( + &self, + compiled_artifact_path: &Path, + params: &[u8], + ) -> Result, String> { + let spawner = self.spawner.clone(); + let mut result = None; + self.thread_pool.scope({ + let result = &mut result; + move |s| { + s.spawn(move |_| { + // spawn does not return a value, so we need to use a variable to pass the result. + *result = Some( + do_execute(compiled_artifact_path, params, spawner) + .map_err(|err| format!("execute error: {:?}", err)), + ); + }); + } + }); + result.unwrap_or_else(|| Err("rayon thread pool spawn failed".to_string())) + } +} + +unsafe fn do_execute( compiled_artifact_path: &Path, params: &[u8], spawner: impl sp_core::traits::SpawnNamed + 'static, @@ -291,9 +373,9 @@ impl sp_externalities::ExtensionStore for ValidationExternalities { /// /// This is a light handle meaning it will only clone the handle not create a new thread pool. #[derive(Clone)] -pub(crate) struct TaskExecutor(futures::executor::ThreadPool); +pub(crate) struct TaskSpawner(futures::executor::ThreadPool); -impl TaskExecutor { +impl TaskSpawner { pub(crate) fn new() -> Result { futures::executor::ThreadPoolBuilder::new() .pool_size(4) @@ -304,7 +386,7 @@ impl TaskExecutor { } } -impl sp_core::traits::SpawnNamed for TaskExecutor { +impl sp_core::traits::SpawnNamed for TaskSpawner { fn spawn_blocking( &self, _task_name: &'static str, diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index ca90285ca7b3..a9124b3926c5 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -30,7 +30,7 @@ use async_std::{ }; use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; -use std::{any::Any, panic, sync::Arc, time::Duration}; +use std::{panic, sync::Arc, time::Duration}; /// The time period after which the preparation worker is considered unresponsive and will be killed. // NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. @@ -294,20 +294,8 @@ fn prepare_artifact(code: &[u8]) -> Result { Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } }) - .map_err(|panic_payload| PrepareError::Panic(stringify_panic_payload(panic_payload))) + .map_err(|panic_payload| { + PrepareError::Panic(crate::error::stringify_panic_payload(panic_payload)) + }) .and_then(|inner_result| inner_result) } - -/// Attempt to convert an opaque panic payload to a string. -/// -/// This is a best effort, and is not guaranteed to provide the most accurate value. -fn stringify_panic_payload(payload: Box) -> String { - match payload.downcast::<&'static str>() { - Ok(msg) => msg.to_string(), - Err(payload) => match payload.downcast::() { - Ok(msg) => *msg, - // At least we tried... - Err(_) => "unkown panic payload".to_string(), - }, - } -} diff --git a/node/core/pvf/src/testing.rs b/node/core/pvf/src/testing.rs index ba407e0b87cb..3b64d130fc6a 100644 --- a/node/core/pvf/src/testing.rs +++ b/node/core/pvf/src/testing.rs @@ -29,7 +29,7 @@ pub fn validate_candidate( code: &[u8], params: &[u8], ) -> Result, Box> { - use crate::executor_intf::{execute, prepare, prevalidate, TaskExecutor}; + use crate::executor_intf::{prepare, prevalidate, Executor}; let code = sp_maybe_compressed_blob::decompress(code, 10 * 1024 * 1024) .expect("Decompressing code failed"); @@ -40,11 +40,11 @@ pub fn validate_candidate( let artifact_path = tmpdir.path().join("blob"); std::fs::write(&artifact_path, &artifact)?; - let executor = TaskExecutor::new()?; + let executor = Executor::new()?; let result = unsafe { // SAFETY: This is trivially safe since the artifact is obtained by calling `prepare` // and is written into a temporary directory in an unmodified state. - execute(&artifact_path, params, executor)? + executor.execute(&artifact_path, params)? }; Ok(result)