diff --git a/Cargo.lock b/Cargo.lock index 8c130152d96e..2fdc862bc031 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1896,6 +1896,31 @@ dependencies = [ "instant", ] +[[package]] +name = "fatality" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ad875162843b0d046276327afe0136e9ed3a23d5a754210fb6f1f33610d39ab" +dependencies = [ + "fatality-proc-macro", + "thiserror", +] + +[[package]] +name = "fatality-proc-macro" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5aa1e3ae159e592ad222dc90c5acbad632b527779ba88486abe92782ab268bd" +dependencies = [ + "expander", + "indexmap", + "proc-macro-crate 1.1.3", + "proc-macro2", + "quote", + "syn", + "thiserror", +] + [[package]] name = "fdlimit" version = "0.2.1" @@ -6226,6 +6251,7 @@ version = "0.9.17" dependencies = [ "assert_matches", "derive_more", + "fatality", "futures 0.3.21", "futures-timer", "lru 0.7.2", @@ -6254,6 +6280,7 @@ version = "0.9.17" dependencies = [ "assert_matches", "env_logger 0.9.0", + "fatality", "futures 0.3.21", "futures-timer", "log", @@ -6337,8 +6364,8 @@ version = "0.9.17" dependencies = [ "always-assert", "assert_matches", - "derive_more", "env_logger 0.9.0", + "fatality", "futures 0.3.21", "futures-timer", "log", @@ -6378,6 +6405,7 @@ dependencies = [ "assert_matches", "async-trait", "derive_more", + "fatality", "futures 0.3.21", "futures-timer", "lazy_static", @@ -6656,6 +6684,7 @@ name = "polkadot-node-core-dispute-coordinator" version = "0.9.17" dependencies = [ "assert_matches", + "fatality", "futures 0.3.21", "kvdb", "kvdb-memorydb", @@ -6838,7 +6867,7 @@ name = "polkadot-node-network-protocol" version = "0.9.17" dependencies = [ "async-trait", - "derive_more", + "fatality", "futures 0.3.21", "parity-scale-codec", "polkadot-node-jaeger", @@ -6925,6 +6954,7 @@ dependencies = [ "async-trait", "derive_more", "env_logger 0.9.0", + "fatality", "futures 0.3.21", "itertools", "lazy_static", @@ -7429,7 +7459,7 @@ version = "0.9.17" dependencies = [ "arrayvec 0.5.2", "assert_matches", - "derive_more", + "fatality", "futures 0.3.21", "futures-timer", "indexmap", @@ -11558,7 +11588,7 @@ version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee73e6e4924fe940354b8d4d98cad5231175d615cd855b758adc658c0aac6a0" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "digest 0.10.2", "rand 0.8.5", "static_assertions", diff --git a/node/core/dispute-coordinator/Cargo.toml b/node/core/dispute-coordinator/Cargo.toml index 5dcc3d35b03e..e3a4ddfa7744 100644 --- a/node/core/dispute-coordinator/Cargo.toml +++ b/node/core/dispute-coordinator/Cargo.toml @@ -11,6 +11,7 @@ parity-scale-codec = "3.0.0" kvdb = "0.11.0" thiserror = "1.0.30" lru = "0.7.2" +fatality = "0.0.6" polkadot-primitives = { path = "../../../primitives" } polkadot-node-primitives = { path = "../../primitives" } @@ -19,6 +20,7 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" } sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } + [dev-dependencies] kvdb-memorydb = "0.11.0" polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/node/core/dispute-coordinator/src/dummy.rs b/node/core/dispute-coordinator/src/dummy.rs index 4f56a0fb9bba..ee58e617b0b5 100644 --- a/node/core/dispute-coordinator/src/dummy.rs +++ b/node/core/dispute-coordinator/src/dummy.rs @@ -24,7 +24,8 @@ use polkadot_primitives::v1::BlockNumber; use futures::prelude::*; -use crate::error::{Error, Result}; +use crate::error::Result; +use fatality::Nested; const LOG_TARGET: &str = "parachain::dispute-coordinator"; @@ -62,13 +63,16 @@ where { loop { let res = run_until_error(&mut ctx, &subsystem).await; - match res { - Err(e) => - if let Error::Fatal(_) = e { - break - }, - Ok(()) => { - tracing::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); + match res.into_nested() { + Err(fatal) => { + tracing::error!(target: LOG_TARGET, "Observed fatal issue: {:?}", fatal); + break + }, + Ok(Err(jfyi)) => { + tracing::debug!(target: LOG_TARGET, "Observed issue: {:?}", jfyi); + }, + Ok(Ok(())) => { + tracing::info!(target: LOG_TARGET, "Received `Conclude` signal, exiting"); break }, } diff --git a/node/core/dispute-coordinator/src/error.rs b/node/core/dispute-coordinator/src/error.rs index ad7115db45b0..f2445640a239 100644 --- a/node/core/dispute-coordinator/src/error.rs +++ b/node/core/dispute-coordinator/src/error.rs @@ -14,90 +14,64 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use fatality::Nested; use futures::channel::oneshot; -use thiserror::Error; -use polkadot_node_subsystem::{ - errors::{ChainApiError, RuntimeApiError}, - SubsystemError, -}; +use polkadot_node_subsystem::{errors::ChainApiError, SubsystemError}; use polkadot_node_subsystem_util::{rolling_session_window::SessionsUnavailable, runtime}; -use crate::LOG_TARGET; +use crate::{real::participation, LOG_TARGET}; use parity_scale_codec::Error as CodecError; -/// Errors for this subsystem. -#[derive(Debug, Error)] -#[error(transparent)] -pub enum Error { - /// All fatal errors. - Fatal(#[from] Fatal), - /// All nonfatal/potentially recoverable errors. - NonFatal(#[from] NonFatal), -} - -/// General `Result` type for dispute coordinator. -pub type Result = std::result::Result; -/// Result type with only fatal errors. -pub type FatalResult = std::result::Result; -/// Result type with only non fatal errors. -pub type NonFatalResult = std::result::Result; - -impl From for Error { - fn from(o: runtime::Error) -> Self { - match o { - runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)), - runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)), - } - } -} - -impl From for Error { - fn from(o: SubsystemError) -> Self { - match o { - SubsystemError::Context(msg) => Self::Fatal(Fatal::SubsystemContext(msg)), - _ => Self::NonFatal(NonFatal::Subsystem(o)), - } - } -} - -/// Fatal errors of this subsystem. -#[derive(Debug, Error)] -pub enum Fatal { - /// Errors coming from runtime::Runtime. - #[error("Error while accessing runtime information {0}")] - Runtime(#[from] runtime::Fatal), +pub type Result = std::result::Result; +pub type FatalResult = std::result::Result; +pub type JfyiResult = std::result::Result; +#[allow(missing_docs)] +#[fatality::fatality(splitable)] +pub enum Error { /// We received a legacy `SubystemError::Context` error which is considered fatal. + #[fatal] #[error("SubsystemError::Context error: {0}")] SubsystemContext(String), /// `ctx.spawn` failed with an error. + #[fatal] #[error("Spawning a task failed: {0}")] - SpawnFailed(SubsystemError), + SpawnFailed(#[source] SubsystemError), + #[fatal] #[error("Participation worker receiver exhausted.")] ParticipationWorkerReceiverExhausted, /// Receiving subsystem message from overseer failed. + #[fatal] #[error("Receiving message from overseer failed: {0}")] SubsystemReceive(#[source] SubsystemError), + #[fatal] #[error("Writing to database failed: {0}")] DbWriteFailed(std::io::Error), - #[error("Oneshot for receiving response from chain API got cancelled")] - ChainApiSenderDropped, + #[fatal] + #[error("Oneshot for receiving block number from chain API got cancelled")] + CanceledBlockNumber, - #[error("Retrieving response from chain API unexpectedly failed with error: {0}")] - ChainApi(#[from] ChainApiError), -} + #[fatal] + #[error("Retrieving block number from chain API failed with error: {0}")] + ChainApiBlockNumber(ChainApiError), -#[derive(Debug, thiserror::Error)] -#[allow(missing_docs)] -pub enum NonFatal { + #[fatal] #[error(transparent)] - RuntimeApi(#[from] RuntimeApiError), + ChainApiAncestors(ChainApiError), + + #[fatal] + #[error("Chain API dropped response channel sender")] + ChainApiSenderDropped, + + #[fatal(forward)] + #[error("Error while accessing runtime information {0}")] + Runtime(#[from] runtime::Error), #[error(transparent)] ChainApi(#[from] ChainApiError), @@ -112,7 +86,7 @@ pub enum NonFatal { DisputeImportOneshotSend, #[error(transparent)] - Subsystem(SubsystemError), + Subsystem(#[from] SubsystemError), #[error(transparent)] Codec(#[from] CodecError), @@ -121,36 +95,32 @@ pub enum NonFatal { #[error("Sessions unavailable in `RollingSessionWindow`: {0}")] RollingSessionWindow(#[from] SessionsUnavailable), - /// Errors coming from runtime::Runtime. - #[error("Error while accessing runtime information: {0}")] - Runtime(#[from] runtime::NonFatal), - #[error(transparent)] - QueueError(#[from] crate::real::participation::QueueError), + QueueError(#[from] participation::QueueError), } /// Utility for eating top level errors and log them. /// /// We basically always want to try and continue on error. This utility function is meant to /// consume top-level errors by simply logging them -pub fn log_error(result: Result<()>) -> std::result::Result<(), Fatal> { - match result { - Err(Error::Fatal(f)) => Err(f), - Err(Error::NonFatal(error)) => { - error.log(); +pub fn log_error(result: Result<()>) -> std::result::Result<(), FatalError> { + match result.into_nested()? { + Ok(()) => Ok(()), + Err(jfyi) => { + jfyi.log(); Ok(()) }, - Ok(()) => Ok(()), } } -impl NonFatal { - /// Log a `NonFatal`. +impl JfyiError { + /// Log a `JfyiError`. pub fn log(self) { match self { // don't spam the log with spurious errors - Self::RuntimeApi(_) | Self::Oneshot(_) => - tracing::debug!(target: LOG_TARGET, error = ?self), + Self::Runtime(_) | Self::Oneshot(_) => { + tracing::debug!(target: LOG_TARGET, error = ?self) + }, // it's worth reporting otherwise _ => tracing::warn!(target: LOG_TARGET, error = ?self), } diff --git a/node/core/dispute-coordinator/src/real/db/v1.rs b/node/core/dispute-coordinator/src/real/db/v1.rs index 05d58e88f286..bb8ce761281b 100644 --- a/node/core/dispute-coordinator/src/real/db/v1.rs +++ b/node/core/dispute-coordinator/src/real/db/v1.rs @@ -28,15 +28,14 @@ use kvdb::{DBTransaction, KeyValueDB}; use parity_scale_codec::{Decode, Encode}; use crate::{ - error::{Fatal, FatalResult}, + error::{FatalError, FatalResult}, + real::{ + backend::{Backend, BackendWriteOp, OverlayedBackend}, + DISPUTE_WINDOW, + }, status::DisputeStatus, }; -use crate::real::{ - backend::{Backend, BackendWriteOp, OverlayedBackend}, - DISPUTE_WINDOW, -}; - const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes"; const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session"; const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes"; @@ -100,7 +99,7 @@ impl Backend for DbBackend { } } - self.inner.write(tx).map_err(Fatal::DbWriteFailed) + self.inner.write(tx).map_err(FatalError::DbWriteFailed) } } @@ -168,8 +167,8 @@ pub enum Error { impl From for crate::error::Error { fn from(err: Error) -> Self { match err { - Error::Io(io) => Self::NonFatal(crate::error::NonFatal::Io(io)), - Error::Codec(e) => Self::NonFatal(crate::error::NonFatal::Codec(e)), + Error::Io(io) => Self::Io(io), + Error::Codec(e) => Self::Codec(e), } } } diff --git a/node/core/dispute-coordinator/src/real/initialized.rs b/node/core/dispute-coordinator/src/real/initialized.rs index 8123aaada4ee..42ee0ae46217 100644 --- a/node/core/dispute-coordinator/src/real/initialized.rs +++ b/node/core/dispute-coordinator/src/real/initialized.rs @@ -53,7 +53,7 @@ use polkadot_primitives::{ }; use crate::{ - error::{log_error, Error, Fatal, FatalResult, NonFatal, NonFatalResult, Result}, + error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, metrics::Metrics, real::{ordering::get_finalized_block_number, DisputeCoordinatorSubsystem}, status::{get_active_with_status, Clock, DisputeStatus, Timestamp}, @@ -610,7 +610,7 @@ impl Initialized { overlay_db: &mut OverlayedBackend<'_, impl Backend>, message: DisputeCoordinatorMessage, now: Timestamp, - ) -> Result NonFatalResult<()>>> { + ) -> Result JfyiResult<()>>> { match message { DisputeCoordinatorMessage::ImportStatements { candidate_hash, @@ -633,7 +633,7 @@ impl Initialized { let report = move || { pending_confirmation .send(outcome) - .map_err(|_| NonFatal::DisputeImportOneshotSend) + .map_err(|_| JfyiError::DisputeImportOneshotSend) }; match outcome { ImportStatementsResult::InvalidImport => { @@ -733,7 +733,7 @@ impl Initialized { // Helper function for checking subsystem errors in message processing. fn ensure_available_session_info(&self) -> Result<()> { if let Some(subsystem_error) = self.error.clone() { - return Err(Error::NonFatal(NonFatal::RollingSessionWindow(subsystem_error))) + return Err(Error::RollingSessionWindow(subsystem_error)) } Ok(()) @@ -1174,8 +1174,8 @@ impl MuxedMessage { let from_overseer = ctx.recv().fuse(); futures::pin_mut!(from_overseer, from_sender); futures::select!( - msg = from_overseer => Ok(Self::Subsystem(msg.map_err(Fatal::SubsystemReceive)?)), - msg = from_sender.next() => Ok(Self::Participation(msg.ok_or(Fatal::ParticipationWorkerReceiverExhausted)?)), + msg = from_overseer => Ok(Self::Subsystem(msg.map_err(FatalError::SubsystemReceive)?)), + msg = from_sender.next() => Ok(Self::Participation(msg.ok_or(FatalError::ParticipationWorkerReceiverExhausted)?)), ) } } diff --git a/node/core/dispute-coordinator/src/real/mod.rs b/node/core/dispute-coordinator/src/real/mod.rs index 6d6d7be85abc..064ead5b6b26 100644 --- a/node/core/dispute-coordinator/src/real/mod.rs +++ b/node/core/dispute-coordinator/src/real/mod.rs @@ -40,13 +40,13 @@ use polkadot_node_subsystem_util::rolling_session_window::RollingSessionWindow; use polkadot_primitives::v1::{ValidatorIndex, ValidatorPair}; use crate::{ - error::{Error, FatalResult, NonFatal, Result}, + error::{FatalResult, JfyiError, Result}, metrics::Metrics, status::{get_active_with_status, SystemClock}, }; - use backend::{Backend, OverlayedBackend}; use db::v1::DbBackend; +use fatality::Split; use self::{ ordering::CandidateComparator, @@ -196,9 +196,8 @@ impl DisputeCoordinatorSubsystem { tracing::info!(target: LOG_TARGET, "received `Conclude` signal, exiting"); return Ok(None) }, - Err(Error::Fatal(f)) => return Err(f), - Err(Error::NonFatal(e)) => { - e.log(); + Err(e) => { + e.split()?.log(); continue }, }; @@ -219,9 +218,8 @@ impl DisputeCoordinatorSubsystem { .await { Ok(v) => v, - Err(Error::Fatal(f)) => return Err(f), - Err(Error::NonFatal(e)) => { - e.log(); + Err(e) => { + e.split()?.log(); continue }, }; @@ -371,7 +369,7 @@ where leaf.clone(), RollingSessionWindow::new(ctx, DISPUTE_WINDOW, leaf.hash) .await - .map_err(NonFatal::RollingSessionWindow)?, + .map_err(JfyiError::RollingSessionWindow)?, ))) } else { Ok(None) @@ -401,11 +399,13 @@ where // available). So instead of telling subsystems, everything is fine, because of an // hour old database state, we should rather cancel contained oneshots and delay // finality until we are fully functional. + { tracing::warn!( target: LOG_TARGET, ?msg, "Received msg before first active leaves update. This is not expected - message will be dropped." - ), + ) + }, } } } diff --git a/node/core/dispute-coordinator/src/real/ordering/mod.rs b/node/core/dispute-coordinator/src/real/ordering/mod.rs index 52650a9cd252..3b7532135a11 100644 --- a/node/core/dispute-coordinator/src/real/ordering/mod.rs +++ b/node/core/dispute-coordinator/src/real/ordering/mod.rs @@ -29,7 +29,7 @@ use polkadot_node_subsystem_util::runtime::get_candidate_events; use polkadot_primitives::v1::{BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, Hash}; use crate::{ - error::{Fatal, FatalResult, Result}, + error::{FatalError, FatalResult, Result}, LOG_TARGET, }; @@ -182,7 +182,7 @@ impl OrderingProvider { &mut self, sender: &mut Sender, update: &ActiveLeavesUpdate, - ) -> Result<()> { + ) -> crate::error::Result<()> { if let Some(activated) = update.activated.as_ref() { // Fetch last finalized block. let ancestors = match get_finalized_block_number(sender).await { @@ -299,7 +299,9 @@ impl OrderingProvider { ) .await; - rx.await.or(Err(Fatal::ChainApiSenderDropped))?.map_err(Fatal::ChainApi)? + rx.await + .or(Err(FatalError::ChainApiSenderDropped))? + .map_err(FatalError::ChainApiAncestors)? }; let earliest_block_number = match head_number.checked_sub(hashes.len() as u32) { @@ -356,8 +358,8 @@ where receiver .await - .map_err(|_| Fatal::ChainApiSenderDropped)? - .map_err(Fatal::ChainApi) + .map_err(|_| FatalError::ChainApiSenderDropped)? + .map_err(FatalError::ChainApiAncestors) } async fn get_block_number( diff --git a/node/core/dispute-coordinator/src/real/participation/mod.rs b/node/core/dispute-coordinator/src/real/participation/mod.rs index f2f3862ab5ed..1041a380a29f 100644 --- a/node/core/dispute-coordinator/src/real/participation/mod.rs +++ b/node/core/dispute-coordinator/src/real/participation/mod.rs @@ -29,12 +29,10 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_util::runtime::get_validation_code_by_hash; use polkadot_primitives::v1::{BlockNumber, CandidateHash, CandidateReceipt, Hash, SessionIndex}; -use crate::{ - error::{Fatal, FatalResult, NonFatal, Result}, - LOG_TARGET, -}; +use crate::real::LOG_TARGET; use super::ordering::CandidateComparator; +use crate::error::{FatalError, FatalResult, JfyiError, Result}; #[cfg(test)] mod tests; @@ -43,7 +41,7 @@ pub use tests::{participation_full_happy_path, participation_missing_availabilit mod queues; use queues::Queues; -pub use queues::{Error as QueueError, ParticipationRequest}; +pub use queues::{ParticipationRequest, QueueError}; /// How many participation processes do we want to run in parallel the most. /// @@ -161,7 +159,7 @@ impl Participation { } } // Out of capacity/no recent block yet - queue: - Ok(self.queue.queue(comparator, req).map_err(NonFatal::QueueError)?) + Ok(self.queue.queue(comparator, req).map_err(JfyiError::QueueError)?) } /// Message from a worker task was received - get the outcome. @@ -239,7 +237,7 @@ impl Participation { "participation-worker", participate(self.worker_sender.clone(), sender, recent_head, req).boxed(), ) - .map_err(Fatal::SpawnFailed)?; + .map_err(FatalError::SpawnFailed)?; } Ok(()) } diff --git a/node/core/dispute-coordinator/src/real/participation/queues/mod.rs b/node/core/dispute-coordinator/src/real/participation/queues/mod.rs index e1cac851f4bc..dbdf00b77dae 100644 --- a/node/core/dispute-coordinator/src/real/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/real/participation/queues/mod.rs @@ -16,8 +16,6 @@ use std::collections::{BTreeMap, HashMap}; -use thiserror::Error; - use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, SessionIndex}; use crate::real::ordering::CandidateComparator; @@ -83,8 +81,8 @@ struct BestEffortEntry { } /// What can go wrong when queuing a request. -#[derive(Debug, Error)] -pub enum Error { +#[derive(Debug, thiserror::Error)] +pub enum QueueError { #[error("Request could not be queued, because best effort queue was already full.")] BestEffortFull, #[error("Request could not be queued, because priority queue was already full.")] @@ -137,21 +135,21 @@ impl Queues { &mut self, comparator: Option, req: ParticipationRequest, - ) -> Result<(), Error> { + ) -> Result<(), QueueError> { debug_assert!(comparator .map(|c| c.matches_candidate(req.candidate_hash())) .unwrap_or(true)); if let Some(comparator) = comparator { if self.priority.len() >= PRIORITY_QUEUE_SIZE { - return Err(Error::PriorityFull) + return Err(QueueError::PriorityFull) } // Remove any best effort entry: self.best_effort.remove(&req.candidate_hash); self.priority.insert(comparator, req); } else { if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE { - return Err(Error::BestEffortFull) + return Err(QueueError::BestEffortFull) } // Note: The request might have been added to priority in a previous call already, we // take care of that case in `dequeue` (more efficient). diff --git a/node/core/dispute-coordinator/src/real/participation/queues/tests.rs b/node/core/dispute-coordinator/src/real/participation/queues/tests.rs index 45c4dc5ded2e..3d5759a66d6b 100644 --- a/node/core/dispute-coordinator/src/real/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/real/participation/queues/tests.rs @@ -20,7 +20,7 @@ use polkadot_primitives::v1::{BlockNumber, Hash}; use crate::real::ordering::CandidateComparator; -use super::{Error, ParticipationRequest, Queues}; +use super::{ParticipationRequest, QueueError, Queues}; /// Make a `ParticipationRequest` based on the given commitments hash. fn make_participation_request(hash: Hash) -> ParticipationRequest { @@ -64,9 +64,9 @@ fn ordering_works_as_expected() { queue.queue(None, req5.clone()).unwrap(); assert_matches!( queue.queue(Some(make_dummy_comparator(&req_prio_full, 3)), req_prio_full), - Err(Error::PriorityFull) + Err(QueueError::PriorityFull) ); - assert_matches!(queue.queue(None, req_full), Err(Error::BestEffortFull)); + assert_matches!(queue.queue(None, req_full), Err(QueueError::BestEffortFull)); assert_eq!(queue.dequeue(), Some(req_prio)); assert_eq!(queue.dequeue(), Some(req_prio_2)); diff --git a/node/core/dispute-coordinator/src/real/spam_slots.rs b/node/core/dispute-coordinator/src/real/spam_slots.rs index b58b812b042a..7818b112411d 100644 --- a/node/core/dispute-coordinator/src/real/spam_slots.rs +++ b/node/core/dispute-coordinator/src/real/spam_slots.rs @@ -64,14 +64,14 @@ impl SpamSlots { let mut slots: HashMap<(SessionIndex, ValidatorIndex), SpamCount> = HashMap::new(); for ((session, _), validators) in unconfirmed_disputes.iter() { for validator in validators { - let e = slots.entry((*session, *validator)).or_default(); - *e += 1; - if *e > MAX_SPAM_VOTES { + let spam_vote_count = slots.entry((*session, *validator)).or_default(); + *spam_vote_count += 1; + if *spam_vote_count > MAX_SPAM_VOTES { tracing::debug!( target: LOG_TARGET, ?session, ?validator, - count = ?e, + count = ?spam_vote_count, "Import exceeded spam slot for validator" ); } @@ -93,8 +93,8 @@ impl SpamSlots { candidate: CandidateHash, validator: ValidatorIndex, ) -> bool { - let c = self.slots.entry((session, validator)).or_default(); - if *c >= MAX_SPAM_VOTES { + let spam_vote_count = self.slots.entry((session, validator)).or_default(); + if *spam_vote_count >= MAX_SPAM_VOTES { return false } let validators = self.unconfirmed.entry((session, candidate)).or_default(); @@ -103,7 +103,7 @@ impl SpamSlots { // We only increment spam slots once per candidate, as each validator has to provide an // opposing vote for sending out its own vote. Therefore, receiving multiple votes for // a single candidate is expected and should not get punished here. - *c += 1; + *spam_vote_count += 1; } true @@ -118,8 +118,8 @@ impl SpamSlots { if let Some(validators) = self.unconfirmed.remove(key) { let (session, _) = key; for validator in validators { - if let Some(c) = self.slots.remove(&(*session, validator)) { - let new = c - 1; + if let Some(spam_vote_count) = self.slots.remove(&(*session, validator)) { + let new = spam_vote_count - 1; if new > 0 { self.slots.insert((*session, validator), new); } diff --git a/node/core/parachains-inherent/src/lib.rs b/node/core/parachains-inherent/src/lib.rs index b15afb831065..fdccb8321dd3 100644 --- a/node/core/parachains-inherent/src/lib.rs +++ b/node/core/parachains-inherent/src/lib.rs @@ -117,8 +117,8 @@ impl sp_inherents::InherentDataProvider for ParachainsInherentDataProvider { async fn try_handle_error( &self, - _: &sp_inherents::InherentIdentifier, - _: &[u8], + _identifier: &sp_inherents::InherentIdentifier, + _error: &[u8], ) -> Option> { // Inherent isn't checked and can not return any error None diff --git a/node/network/availability-distribution/Cargo.toml b/node/network/availability-distribution/Cargo.toml index 57cde892cb90..e87d12cb1221 100644 --- a/node/network/availability-distribution/Cargo.toml +++ b/node/network/availability-distribution/Cargo.toml @@ -20,6 +20,7 @@ thiserror = "1.0.30" rand = "0.8.5" derive_more = "0.99.17" lru = "0.7.2" +fatality = "0.0.6" [dev-dependencies] polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" } diff --git a/node/network/availability-distribution/src/error.rs b/node/network/availability-distribution/src/error.rs index 4ecec30ae574..920eaef2395a 100644 --- a/node/network/availability-distribution/src/error.rs +++ b/node/network/availability-distribution/src/error.rs @@ -17,9 +17,9 @@ //! Error handling related code and Error/Result definitions. +use fatality::Nested; use polkadot_node_network_protocol::request_response::outgoing::RequestError; use polkadot_primitives::v1::SessionIndex; -use thiserror::Error; use futures::channel::oneshot; @@ -28,116 +28,86 @@ use polkadot_subsystem::{ChainApiError, SubsystemError}; use crate::LOG_TARGET; -#[derive(Debug, Error, derive_more::From)] -#[error(transparent)] +#[allow(missing_docs)] +#[fatality::fatality(splitable)] pub enum Error { - /// All fatal errors. - Fatal(Fatal), - /// All nonfatal/potentially recoverable errors. - NonFatal(NonFatal), -} - -impl From for Error { - fn from(o: runtime::Error) -> Self { - match o { - runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)), - runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)), - } - } -} - -/// Fatal errors of this subsystem. -#[derive(Debug, Error)] -pub enum Fatal { - /// Spawning a running task failed. + #[fatal] #[error("Spawning subsystem task failed: {0}")] SpawnTask(#[source] SubsystemError), - /// Requester stream exhausted. + #[fatal] #[error("Erasure chunk requester stream exhausted")] RequesterExhausted, + #[fatal] #[error("Receive channel closed: {0}")] IncomingMessageChannel(#[source] SubsystemError), - /// Errors coming from runtime::Runtime. + #[fatal(forward)] #[error("Error while accessing runtime information: {0}")] - Runtime(#[from] runtime::Fatal), + Runtime(#[from] runtime::Error), + #[fatal] #[error("Oneshot for receiving response from Chain API got cancelled")] ChainApiSenderDropped(#[source] oneshot::Canceled), + #[fatal] #[error("Retrieving response from Chain API unexpectedly failed with error: {0}")] ChainApi(#[from] ChainApiError), -} -/// Non-fatal errors of this subsystem. -#[derive(Debug, Error)] -pub enum NonFatal { - /// av-store will drop the sender on any error that happens. + // av-store will drop the sender on any error that happens. #[error("Response channel to obtain chunk failed")] QueryChunkResponseChannel(#[source] oneshot::Canceled), - /// av-store will drop the sender on any error that happens. + // av-store will drop the sender on any error that happens. #[error("Response channel to obtain available data failed")] QueryAvailableDataResponseChannel(#[source] oneshot::Canceled), - /// We tried accessing a session that was not cached. + // We tried accessing a session that was not cached. #[error("Session {missing_session} is not cached, cached sessions: {available_sessions:?}.")] NoSuchCachedSession { available_sessions: Vec, missing_session: SessionIndex }, - /// Sending request response failed (Can happen on timeouts for example). + // Sending request response failed (Can happen on timeouts for example). #[error("Sending a request's response failed.")] SendResponse, - /// Fetching PoV failed with `RequestError`. #[error("FetchPoV request error: {0}")] FetchPoV(#[source] RequestError), - /// Fetching PoV failed as the received PoV did not match the expected hash. #[error("Fetched PoV does not match expected hash")] UnexpectedPoV, #[error("Remote responded with `NoSuchPoV`")] NoSuchPoV, - /// No validator with the index could be found in current session. - #[error("Given validator index could not be found")] + #[error("Given validator index could not be found in current session")] InvalidValidatorIndex, - - /// Errors coming from runtime::Runtime. - #[error("Error while accessing runtime information: {0}")] - Runtime(#[from] runtime::NonFatal), } -/// General result type for fatal/nonfatal errors. +/// General result abbreviation type alias. pub type Result = std::result::Result; -/// Results which are never fatal. -pub type NonFatalResult = std::result::Result; - /// Utility for eating top level errors and log them. /// /// We basically always want to try and continue on error. This utility function is meant to /// consume top-level errors by simply logging them -pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), Fatal> { - match result { - Err(Error::Fatal(f)) => Err(f), - Err(Error::NonFatal(error)) => { - match error { - NonFatal::UnexpectedPoV | - NonFatal::InvalidValidatorIndex | - NonFatal::NoSuchCachedSession { .. } | - NonFatal::QueryAvailableDataResponseChannel(_) | - NonFatal::QueryChunkResponseChannel(_) => - tracing::warn!(target: LOG_TARGET, error = %error, ctx), - NonFatal::FetchPoV(_) | - NonFatal::SendResponse | - NonFatal::NoSuchPoV | - NonFatal::Runtime(_) => tracing::debug!(target: LOG_TARGET, error = ?error, ctx), +pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), FatalError> { + match result.into_nested()? { + Ok(()) => Ok(()), + Err(jfyi) => { + match jfyi { + JfyiError::UnexpectedPoV | + JfyiError::InvalidValidatorIndex | + JfyiError::NoSuchCachedSession { .. } | + JfyiError::QueryAvailableDataResponseChannel(_) | + JfyiError::QueryChunkResponseChannel(_) => + tracing::warn!(target: LOG_TARGET, error = %jfyi, ctx), + JfyiError::FetchPoV(_) | + JfyiError::SendResponse | + JfyiError::NoSuchPoV | + JfyiError::Runtime(_) => tracing::debug!(target: LOG_TARGET, error = ?jfyi, ctx), } Ok(()) }, - Ok(()) => Ok(()), } } diff --git a/node/network/availability-distribution/src/lib.rs b/node/network/availability-distribution/src/lib.rs index 2b0c76799edc..c09fb208df41 100644 --- a/node/network/availability-distribution/src/lib.rs +++ b/node/network/availability-distribution/src/lib.rs @@ -26,7 +26,7 @@ use polkadot_subsystem::{ /// Error and [`Result`] type for this subsystem. mod error; -use error::{log_error, Fatal, Result}; +use error::{log_error, FatalError, Result}; use polkadot_node_subsystem_util::runtime::RuntimeInfo; @@ -95,7 +95,7 @@ impl AvailabilityDistributionSubsystem { } /// Start processing work as passed on from the Overseer. - async fn run(self, mut ctx: Context) -> std::result::Result<(), Fatal> + async fn run(self, mut ctx: Context) -> std::result::Result<(), FatalError> where Context: SubsystemContext, Context: overseer::SubsystemContext, @@ -111,13 +111,13 @@ impl AvailabilityDistributionSubsystem { "pov-receiver", run_pov_receiver(sender.clone(), pov_req_receiver, metrics.clone()).boxed(), ) - .map_err(Fatal::SpawnTask)?; + .map_err(FatalError::SpawnTask)?; ctx.spawn( "chunk-receiver", run_chunk_receiver(sender, chunk_req_receiver, metrics.clone()).boxed(), ) - .map_err(Fatal::SpawnTask)?; + .map_err(FatalError::SpawnTask)?; } loop { @@ -132,9 +132,9 @@ impl AvailabilityDistributionSubsystem { // Handle task messages sending: let message = match action { Either::Left(subsystem_msg) => - subsystem_msg.map_err(|e| Fatal::IncomingMessageChannel(e))?, + subsystem_msg.map_err(|e| FatalError::IncomingMessageChannel(e))?, Either::Right(from_task) => { - let from_task = from_task.ok_or(Fatal::RequesterExhausted)?; + let from_task = from_task.ok_or(FatalError::RequesterExhausted)?; ctx.send_message(from_task).await; continue }, diff --git a/node/network/availability-distribution/src/pov_requester/mod.rs b/node/network/availability-distribution/src/pov_requester/mod.rs index 1e860f56b9b0..1e5c8b25ec7a 100644 --- a/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/node/network/availability-distribution/src/pov_requester/mod.rs @@ -33,7 +33,7 @@ use polkadot_subsystem::{ }; use crate::{ - error::{Fatal, NonFatal}, + error::{Error, FatalError, JfyiError, Result}, metrics::{FAILED, NOT_FOUND, SUCCEEDED}, Metrics, LOG_TARGET, }; @@ -48,7 +48,7 @@ pub async fn fetch_pov( pov_hash: Hash, tx: oneshot::Sender, metrics: Metrics, -) -> super::Result<()> +) -> Result<()> where Context: SubsystemContext, { @@ -56,7 +56,7 @@ where let authority_id = info .discovery_keys .get(from_validator.0 as usize) - .ok_or(NonFatal::InvalidValidatorIndex)? + .ok_or(JfyiError::InvalidValidatorIndex)? .clone(); let (req, pending_response) = OutgoingRequest::new( Recipient::Authority(authority_id.clone()), @@ -77,7 +77,7 @@ where "pov-fetcher", fetch_pov_job(pov_hash, authority_id, pending_response.boxed(), span, tx, metrics).boxed(), ) - .map_err(|e| Fatal::SpawnTask(e))?; + .map_err(|e| FatalError::SpawnTask(e))?; Ok(()) } @@ -85,7 +85,7 @@ where async fn fetch_pov_job( pov_hash: Hash, authority_id: AuthorityDiscoveryId, - pending_response: BoxFuture<'static, Result>, + pending_response: BoxFuture<'static, std::result::Result>, span: jaeger::Span, tx: oneshot::Sender, metrics: Metrics, @@ -98,17 +98,17 @@ async fn fetch_pov_job( /// Do the actual work of waiting for the response. async fn do_fetch_pov( pov_hash: Hash, - pending_response: BoxFuture<'static, Result>, + pending_response: BoxFuture<'static, std::result::Result>, _span: jaeger::Span, tx: oneshot::Sender, metrics: Metrics, -) -> std::result::Result<(), NonFatal> { - let response = pending_response.await.map_err(NonFatal::FetchPoV); +) -> Result<()> { + let response = pending_response.await.map_err(Error::FetchPoV); let pov = match response { Ok(PoVFetchingResponse::PoV(pov)) => pov, Ok(PoVFetchingResponse::NoSuchPoV) => { metrics.on_fetched_pov(NOT_FOUND); - return Err(NonFatal::NoSuchPoV) + return Err(Error::NoSuchPoV) }, Err(err) => { metrics.on_fetched_pov(FAILED); @@ -117,10 +117,10 @@ async fn do_fetch_pov( }; if pov.hash() == pov_hash { metrics.on_fetched_pov(SUCCEEDED); - tx.send(pov).map_err(|_| NonFatal::SendResponse) + tx.send(pov).map_err(|_| Error::SendResponse) } else { metrics.on_fetched_pov(FAILED); - Err(NonFatal::UnexpectedPoV) + Err(Error::UnexpectedPoV) } } diff --git a/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/node/network/availability-distribution/src/requester/fetch_task/mod.rs index a05ee0cd2d4c..480c3a889562 100644 --- a/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -39,7 +39,7 @@ use polkadot_subsystem::{ }; use crate::{ - error::{Fatal, Result}, + error::{FatalError, Result}, metrics::{Metrics, FAILED, SUCCEEDED}, requester::session_cache::{BadValidators, SessionInfo}, LOG_TARGET, @@ -185,7 +185,7 @@ impl FetchTask { let (handle, kill) = oneshot::channel(); ctx.spawn("chunk-fetcher", running.run(kill).boxed()) - .map_err(|e| Fatal::SpawnTask(e))?; + .map_err(|e| FatalError::SpawnTask(e))?; Ok(FetchTask { live_in, state: FetchedState::Started(handle) }) } else { diff --git a/node/network/availability-distribution/src/requester/mod.rs b/node/network/availability-distribution/src/requester/mod.rs index 6a9a86321b12..14310b3384fd 100644 --- a/node/network/availability-distribution/src/requester/mod.rs +++ b/node/network/availability-distribution/src/requester/mod.rs @@ -39,8 +39,7 @@ use polkadot_subsystem::{ ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SubsystemContext, }; -use super::{Metrics, Result, LOG_TARGET}; -use crate::error::Fatal; +use super::{FatalError, Metrics, Result, LOG_TARGET}; #[cfg(test)] mod tests; @@ -324,6 +323,9 @@ where }) .await; - let ancestors = rx.await.map_err(Fatal::ChainApiSenderDropped)?.map_err(Fatal::ChainApi)?; + let ancestors = rx + .await + .map_err(FatalError::ChainApiSenderDropped)? + .map_err(FatalError::ChainApi)?; Ok(ancestors) } diff --git a/node/network/availability-distribution/src/requester/session_cache.rs b/node/network/availability-distribution/src/requester/session_cache.rs index 10fda8cd9c6a..7e3406e61769 100644 --- a/node/network/availability-distribution/src/requester/session_cache.rs +++ b/node/network/availability-distribution/src/requester/session_cache.rs @@ -26,7 +26,7 @@ use polkadot_primitives::v1::{ use polkadot_subsystem::SubsystemContext; use crate::{ - error::{Error, NonFatal}, + error::{Error, Result}, LOG_TARGET, }; @@ -100,7 +100,7 @@ impl SessionCache { runtime: &mut RuntimeInfo, parent: Hash, with_info: F, - ) -> Result, Error> + ) -> Result> where Context: SubsystemContext, F: FnOnce(&SessionInfo) -> R, @@ -143,10 +143,10 @@ impl SessionCache { /// /// We assume validators in a group are tried in reverse order, so the reported bad validators /// will be put at the beginning of the group. - pub fn report_bad(&mut self, report: BadValidators) -> crate::Result<()> { + pub fn report_bad(&mut self, report: BadValidators) -> Result<()> { let available_sessions = self.session_info_cache.iter().map(|(k, _)| *k).collect(); let session = self.session_info_cache.get_mut(&report.session_index).ok_or( - NonFatal::NoSuchCachedSession { + Error::NoSuchCachedSession { available_sessions, missing_session: report.session_index, }, @@ -179,7 +179,7 @@ impl SessionCache { runtime: &mut RuntimeInfo, relay_parent: Hash, session_index: SessionIndex, - ) -> Result, Error> + ) -> Result> where Context: SubsystemContext, { diff --git a/node/network/availability-distribution/src/responder.rs b/node/network/availability-distribution/src/responder.rs index a7b956232574..e4e9cb4eed9e 100644 --- a/node/network/availability-distribution/src/responder.rs +++ b/node/network/availability-distribution/src/responder.rs @@ -20,8 +20,9 @@ use std::sync::Arc; use futures::channel::oneshot; +use fatality::Nested; use polkadot_node_network_protocol::{ - request_response::{incoming, v1, IncomingRequest, IncomingRequestReceiver}, + request_response::{v1, IncomingRequest, IncomingRequestReceiver}, UnifiedReputationChange as Rep, }; use polkadot_node_primitives::{AvailableData, ErasureChunk}; @@ -29,7 +30,7 @@ use polkadot_primitives::v1::{CandidateHash, ValidatorIndex}; use polkadot_subsystem::{jaeger, messages::AvailabilityStoreMessage, SubsystemSender}; use crate::{ - error::{NonFatal, NonFatalResult, Result}, + error::{JfyiError, Result}, metrics::{Metrics, FAILED, NOT_FOUND, SUCCEEDED}, LOG_TARGET, }; @@ -45,20 +46,20 @@ pub async fn run_pov_receiver( Sender: SubsystemSender, { loop { - match receiver.recv(|| vec![COST_INVALID_REQUEST]).await { - Ok(msg) => { + match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() { + Ok(Ok(msg)) => { answer_pov_request_log(&mut sender, msg, &metrics).await; }, - Err(incoming::Error::Fatal(f)) => { + Err(fatal) => { tracing::debug!( target: LOG_TARGET, - error = ?f, + error = ?fatal, "Shutting down POV receiver." ); return }, - Err(incoming::Error::NonFatal(error)) => { - tracing::debug!(target: LOG_TARGET, ?error, "Error decoding incoming PoV request."); + Ok(Err(jfyi)) => { + tracing::debug!(target: LOG_TARGET, error = ?jfyi, "Error decoding incoming PoV request."); }, } } @@ -73,22 +74,22 @@ pub async fn run_chunk_receiver( Sender: SubsystemSender, { loop { - match receiver.recv(|| vec![COST_INVALID_REQUEST]).await { - Ok(msg) => { + match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() { + Ok(Ok(msg)) => { answer_chunk_request_log(&mut sender, msg, &metrics).await; }, - Err(incoming::Error::Fatal(f)) => { + Err(fatal) => { tracing::debug!( target: LOG_TARGET, - error = ?f, + error = ?fatal, "Shutting down chunk receiver." ); return }, - Err(incoming::Error::NonFatal(error)) => { + Ok(Err(jfyi)) => { tracing::debug!( target: LOG_TARGET, - ?error, + error = ?jfyi, "Error decoding incoming chunk request." ); }, @@ -169,7 +170,7 @@ where }, }; - req.send_response(response).map_err(|_| NonFatal::SendResponse)?; + req.send_response(response).map_err(|_| JfyiError::SendResponse)?; Ok(result) } @@ -205,7 +206,7 @@ where Some(chunk) => v1::ChunkFetchingResponse::Chunk(chunk.into()), }; - req.send_response(response).map_err(|_| NonFatal::SendResponse)?; + req.send_response(response).map_err(|_| JfyiError::SendResponse)?; Ok(result) } @@ -214,7 +215,7 @@ async fn query_chunk( sender: &mut Sender, candidate_hash: CandidateHash, validator_index: ValidatorIndex, -) -> NonFatalResult> +) -> std::result::Result, JfyiError> where Sender: SubsystemSender, { @@ -233,7 +234,7 @@ where error = ?e, "Error retrieving chunk", ); - NonFatal::QueryChunkResponseChannel(e) + JfyiError::QueryChunkResponseChannel(e) })?; Ok(result) } @@ -242,7 +243,7 @@ where async fn query_available_data( sender: &mut Sender, candidate_hash: CandidateHash, -) -> NonFatalResult> +) -> Result> where Sender: SubsystemSender, { @@ -251,6 +252,6 @@ where .send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx).into()) .await; - let result = rx.await.map_err(|e| NonFatal::QueryAvailableDataResponseChannel(e))?; + let result = rx.await.map_err(JfyiError::QueryAvailableDataResponseChannel)?; Ok(result) } diff --git a/node/network/availability-recovery/Cargo.toml b/node/network/availability-recovery/Cargo.toml index 0ac9ce789091..dbf488fb2893 100644 --- a/node/network/availability-recovery/Cargo.toml +++ b/node/network/availability-recovery/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" futures = "0.3.21" lru = "0.7.2" rand = "0.8.5" +fatality = "0.0.6" thiserror = "1.0.30" tracing = "0.1.31" diff --git a/node/network/availability-recovery/src/lib.rs b/node/network/availability-recovery/src/lib.rs index 6c75839daccd..91596d9795de 100644 --- a/node/network/availability-recovery/src/lib.rs +++ b/node/network/availability-recovery/src/lib.rs @@ -36,13 +36,14 @@ use futures::{ use lru::LruCache; use rand::seq::SliceRandom; +use fatality::Nested; use polkadot_erasure_coding::{branch_hash, branches, obtain_chunks_v1, recovery_threshold}; #[cfg(not(test))] use polkadot_node_network_protocol::request_response::CHUNK_REQUEST_TIMEOUT; use polkadot_node_network_protocol::{ request_response::{ - self as req_res, incoming, outgoing::RequestError, v1 as request_v1, - IncomingRequestReceiver, OutgoingRequest, Recipient, Requests, + self as req_res, outgoing::RequestError, v1 as request_v1, IncomingRequestReceiver, + OutgoingRequest, Recipient, Requests, }, IfDisconnected, UnifiedReputationChange as Rep, }; @@ -992,7 +993,7 @@ impl AvailabilityRecoverySubsystem { } } in_req = recv_req => { - match in_req { + match in_req.into_nested().map_err(|fatal| SubsystemError::with_origin("availability-recovery", fatal))? { Ok(req) => { match query_full_data(&mut ctx, req.payload.candidate_hash).await { Ok(res) => { @@ -1009,11 +1010,10 @@ impl AvailabilityRecoverySubsystem { } } } - Err(incoming::Error::Fatal(f)) => return Err(SubsystemError::with_origin("availability-recovery", f)), - Err(incoming::Error::NonFatal(err)) => { + Err(jfyi) => { tracing::debug!( target: LOG_TARGET, - ?err, + error = ?jfyi, "Decoding incoming request failed" ); continue diff --git a/node/network/collator-protocol/Cargo.toml b/node/network/collator-protocol/Cargo.toml index e2220c8404e7..12a2fb79d3a1 100644 --- a/node/network/collator-protocol/Cargo.toml +++ b/node/network/collator-protocol/Cargo.toml @@ -6,10 +6,8 @@ edition = "2021" [dependencies] always-assert = "0.1.2" -derive_more = "0.99.17" futures = "0.3.21" futures-timer = "3" -thiserror = "1.0.30" tracing = "0.1.31" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } @@ -21,6 +19,8 @@ polkadot-node-network-protocol = { path = "../../network/protocol" } polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +fatality = "0.0.6" +thiserror = "1.0.30" [dev-dependencies] log = "0.4.13" diff --git a/node/network/collator-protocol/src/collator_side/mod.rs b/node/network/collator-protocol/src/collator_side/mod.rs index 6834d6ffa4b7..1e13175bee7c 100644 --- a/node/network/collator-protocol/src/collator_side/mod.rs +++ b/node/network/collator-protocol/src/collator_side/mod.rs @@ -50,8 +50,9 @@ use polkadot_subsystem::{ overseer, FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, }; -use super::{Result, LOG_TARGET}; -use crate::error::{log_error, Fatal, FatalResult, NonFatal}; +use super::LOG_TARGET; +use crate::error::{log_error, Error, FatalError, Result}; +use fatality::Split; #[cfg(test)] mod tests; @@ -762,7 +763,7 @@ where let statement = runtime .check_signature(ctx.sender(), relay_parent, statement) .await? - .map_err(NonFatal::InvalidStatementSignature)?; + .map_err(Error::InvalidStatementSignature)?; let removed = state.collation_result_senders.remove(&statement.payload().candidate_hash()); @@ -978,7 +979,7 @@ pub(crate) async fn run( collator_pair: CollatorPair, mut req_receiver: IncomingRequestReceiver, metrics: Metrics, -) -> FatalResult<()> +) -> std::result::Result<(), FatalError> where Context: SubsystemContext, Context: overseer::SubsystemContext, @@ -992,7 +993,7 @@ where let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse(); pin_mut!(recv_req); select! { - msg = ctx.recv().fuse() => match msg.map_err(Fatal::SubsystemReceive)? { + msg = ctx.recv().fuse() => match msg.map_err(FatalError::SubsystemReceive)? { FromOverseer::Communication { msg } => { log_error( process_msg(&mut ctx, &mut runtime, &mut state, msg).await, @@ -1032,11 +1033,11 @@ where "Handling incoming request" )?; } - Err(incoming::Error::Fatal(f)) => return Err(f.into()), - Err(incoming::Error::NonFatal(err)) => { + Err(error) => { + let jfyi = error.split().map_err(incoming::Error::from)?; tracing::debug!( target: LOG_TARGET, - ?err, + error = ?jfyi, "Decoding incoming request failed" ); continue diff --git a/node/network/collator-protocol/src/error.rs b/node/network/collator-protocol/src/error.rs index ff02d87dbaff..2606b44bdc50 100644 --- a/node/network/collator-protocol/src/error.rs +++ b/node/network/collator-protocol/src/error.rs @@ -17,8 +17,6 @@ //! Error handling related code and Error/Result definitions. -use thiserror::Error; - use polkadot_node_network_protocol::request_response::incoming; use polkadot_node_primitives::UncheckedSignedFullStatement; use polkadot_node_subsystem_util::runtime; @@ -28,80 +26,38 @@ use crate::LOG_TARGET; /// General result. pub type Result = std::result::Result; -/// Result with only fatal errors. -pub type FatalResult = std::result::Result; - -/// Errors for statement distribution. -#[derive(Debug, Error, derive_more::From)] -#[error(transparent)] -pub enum Error { - /// All fatal errors. - Fatal(Fatal), - /// All nonfatal/potentially recoverable errors. - NonFatal(NonFatal), -} -impl From for Error { - fn from(o: runtime::Error) -> Self { - match o { - runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)), - runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)), - } - } -} +use fatality::Nested; -impl From for Error { - fn from(o: incoming::Error) -> Self { - match o { - incoming::Error::Fatal(f) => Self::Fatal(Fatal::IncomingRequest(f)), - incoming::Error::NonFatal(f) => Self::NonFatal(NonFatal::IncomingRequest(f)), - } - } -} - -/// Fatal runtime errors. -#[derive(Debug, Error)] -pub enum Fatal { - /// Receiving subsystem message from overseer failed. +#[allow(missing_docs)] +#[fatality::fatality(splitable)] +pub enum Error { + #[fatal] #[error("Receiving message from overseer failed")] - SubsystemReceive(#[source] SubsystemError), + SubsystemReceive(#[from] SubsystemError), - /// Errors coming from runtime::Runtime. - #[error("Error while accessing runtime information")] - Runtime(#[from] runtime::Fatal), - - /// Errors coming from receiving incoming requests. + #[fatal(forward)] #[error("Retrieving next incoming request failed")] - IncomingRequest(#[from] incoming::Fatal), -} - -/// Errors for fetching of runtime information. -#[derive(Debug, Error)] -pub enum NonFatal { - /// Signature was invalid on received statement. - #[error("CollationSeconded contained statement with invalid signature")] - InvalidStatementSignature(UncheckedSignedFullStatement), + IncomingRequest(#[from] incoming::Error), - /// Errors coming from runtime::Runtime. + #[fatal(forward)] #[error("Error while accessing runtime information")] - Runtime(#[from] runtime::NonFatal), + Runtime(#[from] runtime::Error), - /// Errors coming from receiving incoming requests. - #[error("Retrieving next incoming request failed")] - IncomingRequest(#[from] incoming::NonFatal), + #[error("CollationSeconded contained statement with invalid signature")] + InvalidStatementSignature(UncheckedSignedFullStatement), } /// Utility for eating top level errors and log them. /// /// We basically always want to try and continue on error. This utility function is meant to /// consume top-level errors by simply logging them. -pub fn log_error(result: Result<()>, ctx: &'static str) -> FatalResult<()> { - match result { - Err(Error::Fatal(f)) => Err(f), - Err(Error::NonFatal(error)) => { - tracing::warn!(target: LOG_TARGET, error = ?error, ctx); +pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), FatalError> { + match result.into_nested()? { + Ok(()) => Ok(()), + Err(jfyi) => { + tracing::warn!(target: LOG_TARGET, error = ?jfyi, ctx); Ok(()) }, - Ok(()) => Ok(()), } } diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index 0aa53156e759..b0f3d8b80fdb 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -17,7 +17,8 @@ //! The Collator Protocol allows collators and validators talk to each other. //! This subsystem implements both sides of the collator protocol. -#![deny(missing_docs, unused_crate_dependencies)] +#![deny(missing_docs)] +#![deny(unused_crate_dependencies)] #![recursion_limit = "256"] use std::time::Duration; @@ -39,7 +40,6 @@ use polkadot_subsystem::{ }; mod error; -use error::{FatalResult, Result}; mod collator_side; mod validator_side; @@ -98,7 +98,7 @@ impl CollatorProtocolSubsystem { Self { protocol_side } } - async fn run(self, ctx: Context) -> FatalResult<()> + async fn run(self, ctx: Context) -> std::result::Result<(), error::FatalError> where Context: overseer::SubsystemContext, Context: SubsystemContext, diff --git a/node/network/collator-protocol/src/validator_side/mod.rs b/node/network/collator-protocol/src/validator_side/mod.rs index 866062b052cd..0082b9190220 100644 --- a/node/network/collator-protocol/src/validator_side/mod.rs +++ b/node/network/collator-protocol/src/validator_side/mod.rs @@ -54,9 +54,9 @@ use polkadot_subsystem::{ overseer, FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, SubsystemSender, }; -use crate::error::FatalResult; +use crate::error::Result; -use super::{modify_reputation, Result, LOG_TARGET}; +use super::{modify_reputation, LOG_TARGET}; #[cfg(test)] mod tests; @@ -1132,7 +1132,7 @@ pub(crate) async fn run( keystore: SyncCryptoStorePtr, eviction_policy: crate::CollatorEvictionPolicy, metrics: Metrics, -) -> FatalResult<()> +) -> std::result::Result<(), crate::error::FatalError> where Context: overseer::SubsystemContext, Context: SubsystemContext, diff --git a/node/network/dispute-distribution/Cargo.toml b/node/network/dispute-distribution/Cargo.toml index b2bb685a4184..f28b3b6489eb 100644 --- a/node/network/dispute-distribution/Cargo.toml +++ b/node/network/dispute-distribution/Cargo.toml @@ -19,6 +19,7 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } thiserror = "1.0.30" +fatality = "0.0.6" lru = "0.7.2" [dev-dependencies] diff --git a/node/network/dispute-distribution/src/error.rs b/node/network/dispute-distribution/src/error.rs index 00ac06310e8d..4410c8fc6719 100644 --- a/node/network/dispute-distribution/src/error.rs +++ b/node/network/dispute-distribution/src/error.rs @@ -17,76 +17,54 @@ //! Error handling related code and Error/Result definitions. -use thiserror::Error; - use polkadot_node_subsystem_util::runtime; use polkadot_subsystem::SubsystemError; use crate::{sender, LOG_TARGET}; -#[derive(Debug, Error, derive_more::From)] -#[error(transparent)] -pub enum Error { - /// Fatal errors of dispute distribution. - Fatal(Fatal), - /// Non-fatal errors of dispute distribution. - NonFatal(NonFatal), -} - -impl From for Error { - fn from(o: sender::Error) -> Self { - match o { - sender::Error::Fatal(f) => Self::Fatal(Fatal::Sender(f)), - sender::Error::NonFatal(f) => Self::NonFatal(NonFatal::Sender(f)), - } - } -} +use fatality::Nested; -/// Fatal errors of this subsystem. -#[derive(Debug, Error)] -pub enum Fatal { +#[allow(missing_docs)] +#[fatality::fatality(splitable)] +pub enum Error { /// Receiving subsystem message from overseer failed. + #[fatal] #[error("Receiving message from overseer failed")] SubsystemReceive(#[source] SubsystemError), /// Spawning a running task failed. + #[fatal] #[error("Spawning subsystem task failed")] SpawnTask(#[source] SubsystemError), /// `DisputeSender` mpsc receiver exhausted. + #[fatal] #[error("Erasure chunk requester stream exhausted")] SenderExhausted, /// Errors coming from `runtime::Runtime`. + #[fatal(forward)] #[error("Error while accessing runtime information")] - Runtime(#[from] runtime::Fatal), - - /// Errors coming from `DisputeSender` - #[error("Error while accessing runtime information")] - Sender(#[from] sender::Fatal), -} + Runtime(#[from] runtime::Error), -/// Non-fatal errors of this subsystem. -#[derive(Debug, Error)] -pub enum NonFatal { /// Errors coming from `DisputeSender` + #[fatal(forward)] #[error("Error while accessing runtime information")] - Sender(#[from] sender::NonFatal), + Sender(#[from] sender::Error), } pub type Result = std::result::Result; -pub type FatalResult = std::result::Result; +pub type FatalResult = std::result::Result; /// Utility for eating top level errors and log them. /// /// We basically always want to try and continue on error. This utility function is meant to /// consume top-level errors by simply logging them -pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), Fatal> { - match result { - Err(Error::Fatal(f)) => Err(f), - Err(Error::NonFatal(error)) => { - tracing::warn!(target: LOG_TARGET, error = ?error, ctx); +pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), FatalError> { + match result.into_nested()? { + Err(jfyi) => { + tracing::warn!(target: LOG_TARGET, error = ?jfyi, ctx); Ok(()) }, Ok(()) => Ok(()), diff --git a/node/network/dispute-distribution/src/lib.rs b/node/network/dispute-distribution/src/lib.rs index 9b4a7a84c948..d20b3458694e 100644 --- a/node/network/dispute-distribution/src/lib.rs +++ b/node/network/dispute-distribution/src/lib.rs @@ -82,7 +82,7 @@ use self::receiver::DisputesReceiver; /// Error and [`Result`] type for this subsystem. mod error; -use error::{log_error, Fatal, FatalResult, Result}; +use error::{log_error, FatalError, FatalResult, Result}; #[cfg(test)] mod tests; @@ -160,7 +160,7 @@ where } /// Start processing work as passed on from the Overseer. - async fn run(mut self, mut ctx: Context) -> std::result::Result<(), Fatal> + async fn run(mut self, mut ctx: Context) -> std::result::Result<(), FatalError> where Context: SubsystemContext + overseer::SubsystemContext @@ -176,7 +176,7 @@ where self.metrics.clone(), ); ctx.spawn("disputes-receiver", receiver.run().boxed()) - .map_err(Fatal::SpawnTask)?; + .map_err(FatalError::SpawnTask)?; loop { let message = MuxedMessage::receive(&mut ctx, &mut self.sender_rx).await; @@ -197,7 +197,7 @@ where }, MuxedMessage::Sender(result) => { self.disputes_sender - .on_task_message(result.ok_or(Fatal::SenderExhausted)?) + .on_task_message(result.ok_or(FatalError::SenderExhausted)?) .await; }, } @@ -254,7 +254,7 @@ impl MuxedMessage { let from_overseer = ctx.recv().fuse(); futures::pin_mut!(from_overseer, from_sender); futures::select!( - msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(Fatal::SubsystemReceive)), + msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)), msg = from_sender.next() => MuxedMessage::Sender(msg), ) } diff --git a/node/network/dispute-distribution/src/receiver/error.rs b/node/network/dispute-distribution/src/receiver/error.rs index bde8899494dc..6dc69c500bd2 100644 --- a/node/network/dispute-distribution/src/receiver/error.rs +++ b/node/network/dispute-distribution/src/receiver/error.rs @@ -17,100 +17,55 @@ //! Error handling related code and Error/Result definitions. -use thiserror::Error; +use fatality::Nested; use polkadot_node_network_protocol::{request_response::incoming, PeerId}; use polkadot_node_subsystem_util::runtime; use crate::LOG_TARGET; -#[derive(Debug, Error, derive_more::From)] -#[error(transparent)] +#[allow(missing_docs)] +#[fatality::fatality(splitable)] pub enum Error { - /// All fatal errors. - Fatal(Fatal), - /// All nonfatal/potentially recoverable errors. - NonFatal(NonFatal), -} - -impl From for Error { - fn from(o: runtime::Error) -> Self { - match o { - runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)), - runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)), - } - } -} - -impl From for Error { - fn from(o: incoming::Error) -> Self { - match o { - incoming::Error::Fatal(f) => Self::Fatal(Fatal::IncomingRequest(f)), - incoming::Error::NonFatal(f) => Self::NonFatal(NonFatal::IncomingRequest(f)), - } - } -} - -/// Fatal errors of this subsystem. -#[derive(Debug, Error)] -pub enum Fatal { - /// Errors coming from runtime::Runtime. + #[fatal(forward)] #[error("Error while accessing runtime information")] - Runtime(#[from] runtime::Fatal), + Runtime(#[from] runtime::Error), - /// Errors coming from receiving incoming requests. + #[fatal(forward)] #[error("Retrieving next incoming request failed.")] - IncomingRequest(#[from] incoming::Fatal), -} + IncomingRequest(#[from] incoming::Error), -/// Non-fatal errors of this subsystem. -#[derive(Debug, Error)] -pub enum NonFatal { - /// Answering request failed. #[error("Sending back response to peer {0} failed.")] SendResponse(PeerId), - /// Setting reputation for peer failed. #[error("Changing peer's ({0}) reputation failed.")] SetPeerReputation(PeerId), - /// Peer sent us request with invalid signature. #[error("Dispute request with invalid signatures, from peer {0}.")] InvalidSignature(PeerId), - /// Import oneshot got canceled. #[error("Import of dispute got canceled for peer {0} - import failed for some reason.")] ImportCanceled(PeerId), - /// Non validator tried to participate in dispute. - #[error("Peer {0} is not a validator.")] + #[error("Peer {0} attempted to participate in dispute and is not a validator.")] NotAValidator(PeerId), - - /// Errors coming from runtime::Runtime. - #[error("Error while accessing runtime information")] - Runtime(#[from] runtime::NonFatal), - - /// Errors coming from receiving incoming requests. - #[error("Retrieving next incoming request failed.")] - IncomingRequest(#[from] incoming::NonFatal), } pub type Result = std::result::Result; -pub type NonFatalResult = std::result::Result; +pub type JfyiErrorResult = std::result::Result; /// Utility for eating top level errors and log them. /// /// We basically always want to try and continue on error. This utility function is meant to /// consume top-level errors by simply logging them. -pub fn log_error(result: Result<()>) -> std::result::Result<(), Fatal> { - match result { - Err(Error::Fatal(f)) => Err(f), - Err(Error::NonFatal(error @ NonFatal::ImportCanceled(_))) => { +pub fn log_error(result: Result<()>) -> std::result::Result<(), FatalError> { + match result.into_nested()? { + Err(error @ JfyiError::ImportCanceled(_)) => { tracing::debug!(target: LOG_TARGET, error = ?error); Ok(()) }, - Err(Error::NonFatal(error)) => { + Err(error) => { tracing::warn!(target: LOG_TARGET, error = ?error); Ok(()) }, diff --git a/node/network/dispute-distribution/src/receiver/mod.rs b/node/network/dispute-distribution/src/receiver/mod.rs index 03d242c71999..7166b24d3d55 100644 --- a/node/network/dispute-distribution/src/receiver/mod.rs +++ b/node/network/dispute-distribution/src/receiver/mod.rs @@ -32,7 +32,7 @@ use lru::LruCache; use polkadot_node_network_protocol::{ authority_discovery::AuthorityDiscovery, request_response::{ - incoming::{OutgoingResponse, OutgoingResponseSender}, + incoming::{self, OutgoingResponse, OutgoingResponseSender}, v1::{DisputeRequest, DisputeResponse}, IncomingRequest, IncomingRequestReceiver, }, @@ -51,7 +51,7 @@ use crate::{ }; mod error; -use self::error::{log_error, NonFatal, NonFatalResult, Result}; +use self::error::{log_error, JfyiError, JfyiErrorResult, Result}; const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Received message could not be decoded."); const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Signatures were invalid."); @@ -101,7 +101,7 @@ enum MuxedMessage { /// - We need to make sure responses are actually sent (therefore we need to await futures /// promptly). /// - We need to update `banned_peers` accordingly to the result. - ConfirmedImport(NonFatalResult<(PeerId, ImportStatementsResult)>), + ConfirmedImport(JfyiErrorResult<(PeerId, ImportStatementsResult)>), /// A new request has arrived and should be handled. NewRequest(IncomingRequest), @@ -117,7 +117,7 @@ impl MuxedMessage { pin_mut!(next_req); if let Poll::Ready(r) = next_req.poll(ctx) { return match r { - Err(e) => Poll::Ready(Err(e.into())), + Err(e) => Poll::Ready(Err(incoming::Error::from(e).into())), Ok(v) => Poll::Ready(Ok(Self::NewRequest(v))), } } @@ -204,9 +204,9 @@ where reputation_changes: vec![COST_NOT_A_VALIDATOR], sent_feedback: None, }) - .map_err(|_| NonFatal::SendResponse(peer))?; + .map_err(|_| JfyiError::SendResponse(peer))?; - return Err(NonFatal::NotAValidator(peer).into()) + return Err(JfyiError::NotAValidator(peer).into()) } // Immediately drop requests from peers that already have requests in flight or have @@ -255,9 +255,9 @@ where reputation_changes: vec![COST_INVALID_SIGNATURE], sent_feedback: None, }) - .map_err(|_| NonFatal::SetPeerReputation(peer))?; + .map_err(|_| JfyiError::SetPeerReputation(peer))?; - return Err(From::from(NonFatal::InvalidSignature(peer))) + return Err(From::from(JfyiError::InvalidSignature(peer))) }, Ok(votes) => votes, }; @@ -285,8 +285,8 @@ where /// In addition we report import metrics. fn ban_bad_peer( &mut self, - result: NonFatalResult<(PeerId, ImportStatementsResult)>, - ) -> NonFatalResult<()> { + result: JfyiErrorResult<(PeerId, ImportStatementsResult)>, + ) -> JfyiErrorResult<()> { match result? { (_, ImportStatementsResult::ValidImport) => { self.metrics.on_imported(SUCCEEDED); @@ -303,7 +303,8 @@ where /// Manage pending imports in a way that preserves invariants. struct PendingImports { /// Futures in flight. - futures: FuturesUnordered)>>, + futures: + FuturesUnordered)>>, /// Peers whose requests are currently in flight. peers: HashSet, } @@ -341,7 +342,7 @@ impl PendingImports { } impl Stream for PendingImports { - type Item = NonFatalResult<(PeerId, ImportStatementsResult)>; + type Item = JfyiErrorResult<(PeerId, ImportStatementsResult)>; fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.futures).poll_next(ctx) { Poll::Pending => Poll::Pending, @@ -368,8 +369,8 @@ async fn respond_to_request( peer: PeerId, handled: oneshot::Receiver, pending_response: OutgoingResponseSender, -) -> NonFatalResult { - let result = handled.await.map_err(|_| NonFatal::ImportCanceled(peer))?; +) -> JfyiErrorResult { + let result = handled.await.map_err(|_| JfyiError::ImportCanceled(peer))?; let response = match result { ImportStatementsResult::ValidImport => OutgoingResponse { @@ -386,7 +387,7 @@ async fn respond_to_request( pending_response .send_outgoing_response(response) - .map_err(|_| NonFatal::SendResponse(peer))?; + .map_err(|_| JfyiError::SendResponse(peer))?; Ok(result) } diff --git a/node/network/dispute-distribution/src/sender/error.rs b/node/network/dispute-distribution/src/sender/error.rs index 4961fc5685b1..32f2e2f1e0d5 100644 --- a/node/network/dispute-distribution/src/sender/error.rs +++ b/node/network/dispute-distribution/src/sender/error.rs @@ -17,46 +17,21 @@ //! Error handling related code and Error/Result definitions. -use thiserror::Error; - use polkadot_node_primitives::disputes::DisputeMessageCheckError; use polkadot_node_subsystem_util::runtime; use polkadot_subsystem::SubsystemError; -#[derive(Debug, Error, derive_more::From)] -#[error(transparent)] +#[allow(missing_docs)] +#[fatality::fatality(splitable)] pub enum Error { - /// All fatal errors. - Fatal(Fatal), - /// All nonfatal/potentially recoverable errors. - NonFatal(NonFatal), -} - -impl From for Error { - fn from(o: runtime::Error) -> Self { - match o { - runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)), - runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)), - } - } -} - -/// Fatal errors of this subsystem. -#[derive(Debug, Error)] -#[error(transparent)] -pub enum Fatal { - /// Spawning a running task failed. + #[fatal] #[error("Spawning subsystem task failed")] SpawnTask(#[source] SubsystemError), - /// Errors coming from runtime::Runtime. + #[fatal(forward)] #[error("Error while accessing runtime information")] - Runtime(#[from] runtime::Fatal), -} + Runtime(#[from] runtime::Error), -/// Non-fatal errors of this subsystem. -#[derive(Debug, Error)] -pub enum NonFatal { /// We need available active heads for finding relevant authorities. #[error("No active heads available - needed for finding relevant authorities.")] NoActiveHeads, @@ -92,11 +67,7 @@ pub enum NonFatal { /// A statement's `ValidatorIndex` could not be looked up. #[error("ValidatorIndex of statement could not be found")] InvalidValidatorIndexFromCoordinator, - - /// Errors coming from runtime::Runtime. - #[error("Error while accessing runtime information")] - Runtime(#[from] runtime::NonFatal), } pub type Result = std::result::Result; -pub type NonFatalResult = std::result::Result; +pub type JfyiErrorResult = std::result::Result; diff --git a/node/network/dispute-distribution/src/sender/mod.rs b/node/network/dispute-distribution/src/sender/mod.rs index ee2e15b57648..0b0d90345d81 100644 --- a/node/network/dispute-distribution/src/sender/mod.rs +++ b/node/network/dispute-distribution/src/sender/mod.rs @@ -37,9 +37,9 @@ pub use send_task::TaskFinish; /// Error and [`Result`] type for sender mod error; -pub use error::{Error, Fatal, NonFatal, Result}; +pub use error::{Error, FatalError, JfyiError, Result}; -use self::error::NonFatalResult; +use self::error::JfyiErrorResult; use crate::{Metrics, LOG_TARGET}; /// The `DisputeSender` keeps track of all ongoing disputes we need to send statements out. @@ -208,7 +208,7 @@ impl DisputeSender { None } }) - .ok_or(NonFatal::NoActiveHeads)?; + .ok_or(JfyiError::NoActiveHeads)?; let info = runtime .get_session_info_by_index(ctx.sender(), *ref_head, session_index) @@ -243,11 +243,12 @@ impl DisputeSender { let (valid_vote, invalid_vote) = if let Some(our_valid_vote) = our_valid_vote { // Get some invalid vote as well: - let invalid_vote = votes.invalid.get(0).ok_or(NonFatal::MissingVotesFromCoordinator)?; + let invalid_vote = + votes.invalid.get(0).ok_or(JfyiError::MissingVotesFromCoordinator)?; (our_valid_vote, invalid_vote) } else if let Some(our_invalid_vote) = our_invalid_vote { // Get some valid vote as well: - let valid_vote = votes.valid.get(0).ok_or(NonFatal::MissingVotesFromCoordinator)?; + let valid_vote = votes.valid.get(0).ok_or(JfyiError::MissingVotesFromCoordinator)?; (valid_vote, our_invalid_vote) } else { // There is no vote from us yet - nothing to do. @@ -258,7 +259,7 @@ impl DisputeSender { .session_info .validators .get(valid_index.0 as usize) - .ok_or(NonFatal::InvalidStatementFromCoordinator)?; + .ok_or(JfyiError::InvalidStatementFromCoordinator)?; let valid_signed = SignedDisputeStatement::new_checked( DisputeStatement::Valid(kind.clone()), candidate_hash, @@ -266,14 +267,14 @@ impl DisputeSender { valid_public.clone(), signature.clone(), ) - .map_err(|()| NonFatal::InvalidStatementFromCoordinator)?; + .map_err(|()| JfyiError::InvalidStatementFromCoordinator)?; let (kind, invalid_index, signature) = invalid_vote; let invalid_public = info .session_info .validators .get(invalid_index.0 as usize) - .ok_or(NonFatal::InvalidValidatorIndexFromCoordinator)?; + .ok_or(JfyiError::InvalidValidatorIndexFromCoordinator)?; let invalid_signed = SignedDisputeStatement::new_checked( DisputeStatement::Invalid(kind.clone()), candidate_hash, @@ -281,7 +282,7 @@ impl DisputeSender { invalid_public.clone(), signature.clone(), ) - .map_err(|()| NonFatal::InvalidValidatorIndexFromCoordinator)?; + .map_err(|()| JfyiError::InvalidValidatorIndexFromCoordinator)?; // Reconstructing the checked signed dispute statements is hardly useful here and wasteful, // but I don't want to enable a bypass for the below smart constructor and this code path @@ -297,7 +298,7 @@ impl DisputeSender { votes.candidate_receipt, &info.session_info, ) - .map_err(NonFatal::InvalidDisputeFromCoordinator)?; + .map_err(JfyiError::InvalidDisputeFromCoordinator)?; // Finally, get the party started: self.start_sender(ctx, runtime, message).await @@ -341,13 +342,13 @@ async fn get_active_session_indices( /// Retrieve Set of active disputes from the dispute coordinator. async fn get_active_disputes( ctx: &mut Context, -) -> NonFatalResult> { +) -> JfyiErrorResult> { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes( tx, ))) .await; - rx.await.map_err(|_| NonFatal::AskActiveDisputesCanceled) + rx.await.map_err(|_| JfyiError::AskActiveDisputesCanceled) } /// Get all locally available dispute votes for a given dispute. @@ -355,7 +356,7 @@ async fn get_candidate_votes( ctx: &mut Context, session_index: SessionIndex, candidate_hash: CandidateHash, -) -> NonFatalResult> { +) -> JfyiErrorResult> { let (tx, rx) = oneshot::channel(); ctx.send_message(AllMessages::DisputeCoordinator( DisputeCoordinatorMessage::QueryCandidateVotes(vec![(session_index, candidate_hash)], tx), @@ -363,5 +364,5 @@ async fn get_candidate_votes( .await; rx.await .map(|v| v.get(0).map(|inner| inner.to_owned().2)) - .map_err(|_| NonFatal::AskCandidateVotesCanceled) + .map_err(|_| JfyiError::AskCandidateVotesCanceled) } diff --git a/node/network/dispute-distribution/src/sender/send_task.rs b/node/network/dispute-distribution/src/sender/send_task.rs index ed4aab74939a..7a7c4755d4ac 100644 --- a/node/network/dispute-distribution/src/sender/send_task.rs +++ b/node/network/dispute-distribution/src/sender/send_task.rs @@ -35,7 +35,7 @@ use polkadot_subsystem::{ SubsystemContext, }; -use super::error::{Fatal, Result}; +use super::error::{FatalError, Result}; use crate::{ metrics::{FAILED, SUCCEEDED}, @@ -266,7 +266,7 @@ async fn send_requests( ); let (remote, remote_handle) = fut.remote_handle(); - ctx.spawn("dispute-sender", remote.boxed()).map_err(Fatal::SpawnTask)?; + ctx.spawn("dispute-sender", remote.boxed()).map_err(FatalError::SpawnTask)?; statuses.insert(receiver, DeliveryStatus::Pending(remote_handle)); } diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml index 95f8e530f49c..3103856862a3 100644 --- a/node/network/protocol/Cargo.toml +++ b/node/network/protocol/Cargo.toml @@ -14,6 +14,6 @@ parity-scale-codec = { version = "3.0.0", default-features = false, features = [ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } strum = { version = "0.24", features = ["derive"] } -derive_more = "0.99.17" futures = "0.3.21" thiserror = "1.0.30" +fatality = "0.0.6" diff --git a/node/network/protocol/src/request_response/incoming/error.rs b/node/network/protocol/src/request_response/incoming/error.rs index d7ffe6b1fd4c..1bb7fa82effe 100644 --- a/node/network/protocol/src/request_response/incoming/error.rs +++ b/node/network/protocol/src/request_response/incoming/error.rs @@ -17,31 +17,17 @@ //! Error handling related code and Error/Result definitions. use sc_network::PeerId; -use thiserror::Error; use parity_scale_codec::Error as DecodingError; -/// Errors that happen during reception/decoding of incoming requests. -#[derive(Debug, Error, derive_more::From)] -#[error(transparent)] +#[allow(missing_docs)] +#[fatality::fatality(splitable)] pub enum Error { - /// All fatal errors. - Fatal(Fatal), - /// All nonfatal/potentially recoverable errors. - NonFatal(NonFatal), -} - -/// Fatal errors when receiving incoming requests. -#[derive(Debug, Error)] -pub enum Fatal { - /// Incoming request stream exhausted. Should only happen on shutdown. + // Incoming request stream exhausted. Should only happen on shutdown. + #[fatal] #[error("Incoming request channel got closed.")] RequestChannelExhausted, -} -/// Non-fatal errors when receiving incoming requests. -#[derive(Debug, Error)] -pub enum NonFatal { /// Decoding failed, we were able to change the peer's reputation accordingly. #[error("Decoding request failed for peer {0}.")] DecodingError(PeerId, #[source] DecodingError), diff --git a/node/network/protocol/src/request_response/incoming/mod.rs b/node/network/protocol/src/request_response/incoming/mod.rs index efc24babd23e..309ca32b0de4 100644 --- a/node/network/protocol/src/request_response/incoming/mod.rs +++ b/node/network/protocol/src/request_response/incoming/mod.rs @@ -29,7 +29,7 @@ use super::IsRequest; use crate::UnifiedReputationChange; mod error; -pub use error::{Error, Fatal, NonFatal, Result}; +pub use error::{Error, FatalError, JfyiError, Result}; /// A request coming in, including a sender for sending responses. /// @@ -84,7 +84,7 @@ where fn try_from_raw( raw: sc_network::config::IncomingRequest, reputation_changes: Vec, - ) -> std::result::Result { + ) -> std::result::Result { let sc_network::config::IncomingRequest { payload, peer, pending_response } = raw; let payload = match Req::decode(&mut payload.as_ref()) { Ok(payload) => payload, @@ -98,9 +98,9 @@ where }; if let Err(_) = pending_response.send(response) { - return Err(NonFatal::DecodingErrorNoReputationChange(peer, err)) + return Err(JfyiError::DecodingErrorNoReputationChange(peer, err)) } - return Err(NonFatal::DecodingError(peer, err)) + return Err(JfyiError::DecodingError(peer, err)) }, }; Ok(Self::new(peer, payload, pending_response)) @@ -224,7 +224,7 @@ where F: FnOnce() -> Vec, { let req = match self.raw.next().await { - None => return Err(Fatal::RequestChannelExhausted.into()), + None => return Err(FatalError::RequestChannelExhausted.into()), Some(raw) => IncomingRequest::::try_from_raw(raw, reputation_changes())?, }; Ok(req) diff --git a/node/network/protocol/src/request_response/outgoing.rs b/node/network/protocol/src/request_response/outgoing.rs index acb78d06d7b2..19aba34535d7 100644 --- a/node/network/protocol/src/request_response/outgoing.rs +++ b/node/network/protocol/src/request_response/outgoing.rs @@ -15,7 +15,6 @@ // along with Polkadot. If not, see . use futures::{channel::oneshot, prelude::Future}; -use thiserror::Error; use parity_scale_codec::{Decode, Encode, Error as DecodingError}; @@ -79,19 +78,19 @@ impl Requests { pub type ResponseSender = oneshot::Sender, network::RequestFailure>>; /// Any error that can occur when sending a request. -#[derive(Debug, Error)] +#[derive(Debug, thiserror::Error)] pub enum RequestError { /// Response could not be decoded. #[error("Response could not be decoded: {0}")] - InvalidResponse(#[source] DecodingError), + InvalidResponse(#[from] DecodingError), /// Some error in substrate/libp2p happened. #[error("{0}")] - NetworkError(#[source] network::RequestFailure), + NetworkError(#[from] network::RequestFailure), /// Response got canceled by networking. #[error("Response channel got canceled")] - Canceled(#[source] oneshot::Canceled), + Canceled(#[from] oneshot::Canceled), } impl RequestError { @@ -180,21 +179,3 @@ where let raw = rec.await??; Ok(Decode::decode(&mut raw.as_ref())?) } - -impl From for RequestError { - fn from(err: DecodingError) -> Self { - Self::InvalidResponse(err) - } -} - -impl From for RequestError { - fn from(err: network::RequestFailure) -> Self { - Self::NetworkError(err) - } -} - -impl From for RequestError { - fn from(err: oneshot::Canceled) -> Self { - Self::Canceled(err) - } -} diff --git a/node/network/statement-distribution/Cargo.toml b/node/network/statement-distribution/Cargo.toml index 5982393364cc..203fcacbb1f6 100644 --- a/node/network/statement-distribution/Cargo.toml +++ b/node/network/statement-distribution/Cargo.toml @@ -18,8 +18,8 @@ polkadot-node-network-protocol = { path = "../../network/protocol" } arrayvec = "0.5.2" indexmap = "1.8.0" parity-scale-codec = { version = "3.0.0", default-features = false, features = ["derive"] } -derive_more = "0.99.17" thiserror = "1.0.30" +fatality = "0.0.6" [dev-dependencies] polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } diff --git a/node/network/statement-distribution/src/error.rs b/node/network/statement-distribution/src/error.rs index ac997544b5c8..7ccd31a48fc7 100644 --- a/node/network/statement-distribution/src/error.rs +++ b/node/network/statement-distribution/src/error.rs @@ -22,88 +22,58 @@ use polkadot_node_subsystem_util::runtime; use polkadot_primitives::v1::{CandidateHash, Hash}; use polkadot_subsystem::SubsystemError; -use thiserror::Error; - use crate::LOG_TARGET; /// General result. pub type Result = std::result::Result; /// Result for non-fatal only failures. -pub type NonFatalResult = std::result::Result; +pub type JfyiErrorResult = std::result::Result; /// Result for fatal only failures. -pub type FatalResult = std::result::Result; +pub type FatalResult = std::result::Result; -/// Errors for statement distribution. -#[derive(Debug, Error, derive_more::From)] -#[error(transparent)] -pub enum Error { - /// Fatal errors of dispute distribution. - Fatal(Fatal), - /// Non-fatal errors of dispute distribution. - NonFatal(NonFatal), -} - -impl From for Error { - fn from(o: runtime::Error) -> Self { - match o { - runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)), - runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)), - } - } -} +use fatality::Nested; -/// Fatal errors. -#[derive(Debug, Error)] -pub enum Fatal { - /// Requester channel is never closed. +#[allow(missing_docs)] +#[fatality::fatality(splitable)] +pub enum Error { + #[fatal] #[error("Requester receiver stream finished")] RequesterReceiverFinished, - /// Responder channel is never closed. + #[fatal] #[error("Responder receiver stream finished")] ResponderReceiverFinished, - /// Spawning a running task failed. + #[fatal] #[error("Spawning subsystem task failed")] SpawnTask(#[source] SubsystemError), - /// Receiving subsystem message from overseer failed. + #[fatal] #[error("Receiving message from overseer failed")] SubsystemReceive(#[source] SubsystemError), - /// Errors coming from runtime::Runtime. - #[error("Error while accessing runtime information")] - Runtime(#[from] runtime::Fatal), -} - -/// Errors for fetching of runtime information. -#[derive(Debug, Error)] -pub enum NonFatal { - /// Errors coming from runtime::Runtime. + #[fatal(forward)] #[error("Error while accessing runtime information")] - Runtime(#[from] runtime::NonFatal), + Runtime(#[from] runtime::Error), - /// Relay parent was not present in active heads. #[error("Relay parent could not be found in active heads")] NoSuchHead(Hash), - /// Received message from actually disconnected peer. #[error("Message from not connected peer")] NoSuchPeer(PeerId), - /// Peer requested statement data for candidate that was never announced to it. #[error("Peer requested data for candidate it never received a notification for (malicious?)")] RequestedUnannouncedCandidate(PeerId, CandidateHash), - /// A large statement status was requested, which could not be found. + // A large statement status was requested, which could not be found. #[error("Statement status does not exist")] NoSuchLargeStatementStatus(Hash, CandidateHash), - /// A fetched large statement was requested, but could not be found. + // A fetched large statement was requested, but could not be found. #[error("Fetched large statement does not exist")] NoSuchFetchedLargeStatement(Hash, CandidateHash), - /// Responder no longer waits for our data. (Should not happen right now.) + // Responder no longer waits for our data. (Should not happen right now.) #[error("Oneshot `GetData` channel closed")] ResponderGetDataCanceled, } @@ -112,14 +82,13 @@ pub enum NonFatal { /// /// We basically always want to try and continue on error. This utility function is meant to /// consume top-level errors by simply logging them. -pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), Fatal> { - match result { - Err(Error::Fatal(f)) => Err(f), - Err(Error::NonFatal(error)) => { - match error { - NonFatal::RequestedUnannouncedCandidate(_, _) => - tracing::warn!(target: LOG_TARGET, error = %error, ctx), - _ => tracing::debug!(target: LOG_TARGET, error = %error, ctx), +pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), FatalError> { + match result.into_nested()? { + Err(jfyi) => { + match jfyi { + JfyiError::RequestedUnannouncedCandidate(_, _) => + tracing::warn!(target: LOG_TARGET, error = %jfyi, ctx), + _ => tracing::debug!(target: LOG_TARGET, error = %jfyi, ctx), } Ok(()) }, diff --git a/node/network/statement-distribution/src/lib.rs b/node/network/statement-distribution/src/lib.rs index db1c022a94b5..9b425a770fa3 100644 --- a/node/network/statement-distribution/src/lib.rs +++ b/node/network/statement-distribution/src/lib.rs @@ -22,7 +22,7 @@ #![deny(unused_crate_dependencies)] #![warn(missing_docs)] -use error::{log_error, FatalResult, NonFatalResult}; +use error::{log_error, FatalResult, JfyiErrorResult}; use parity_scale_codec::Encode; use polkadot_node_network_protocol::{ @@ -62,8 +62,10 @@ use util::runtime::RuntimeInfo; use std::collections::{hash_map::Entry, HashMap, HashSet}; +use fatality::Nested; + mod error; -pub use error::{Error, Fatal, NonFatal, Result}; +pub use error::{Error, FatalError, JfyiError, Result}; /// Background task logic for requesting of large statements. mod requester; @@ -608,7 +610,7 @@ impl MuxedMessage { let from_responder = from_responder.next(); futures::pin_mut!(from_overseer, from_requester, from_responder); futures::select! { - msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(Fatal::SubsystemReceive)), + msg = from_overseer => MuxedMessage::Subsystem(msg.map_err(FatalError::SubsystemReceive)), msg = from_requester => MuxedMessage::Requester(msg), msg = from_responder => MuxedMessage::Responder(msg), } @@ -1548,7 +1550,7 @@ impl StatementDistributionSubsystem { mut self, mut ctx: (impl SubsystemContext + overseer::SubsystemContext), - ) -> std::result::Result<(), Fatal> { + ) -> std::result::Result<(), FatalError> { let mut peers: HashMap = HashMap::new(); let mut gossip_peers: HashSet = HashSet::new(); let mut authorities: HashMap = HashMap::new(); @@ -1569,7 +1571,7 @@ impl StatementDistributionSubsystem { ) .boxed(), ) - .map_err(Fatal::SpawnTask)?; + .map_err(FatalError::SpawnTask)?; loop { let message = @@ -1588,11 +1590,10 @@ impl StatementDistributionSubsystem { result?, ) .await; - match result { + match result.into_nested()? { Ok(true) => break, Ok(false) => {}, - Err(Error::Fatal(f)) => return Err(f), - Err(Error::NonFatal(error)) => tracing::debug!(target: LOG_TARGET, ?error), + Err(jfyi) => tracing::debug!(target: LOG_TARGET, error = ?jfyi), } }, MuxedMessage::Requester(result) => { @@ -1603,7 +1604,7 @@ impl StatementDistributionSubsystem { &mut peers, &mut active_heads, &req_sender, - result.ok_or(Fatal::RequesterReceiverFinished)?, + result.ok_or(FatalError::RequesterReceiverFinished)?, ) .await; log_error(result.map_err(From::from), "handle_requester_message")?; @@ -1613,7 +1614,7 @@ impl StatementDistributionSubsystem { .handle_responder_message( &peers, &mut active_heads, - result.ok_or(Fatal::ResponderReceiverFinished)?, + result.ok_or(FatalError::ResponderReceiverFinished)?, ) .await; log_error(result.map_err(From::from), "handle_responder_message")?; @@ -1629,7 +1630,7 @@ impl StatementDistributionSubsystem { peers: &HashMap, active_heads: &mut HashMap, message: ResponderMessage, - ) -> NonFatalResult<()> { + ) -> JfyiErrorResult<()> { match message { ResponderMessage::GetData { requesting_peer, relay_parent, candidate_hash, tx } => { if !requesting_peer_knows_about_candidate( @@ -1638,25 +1639,25 @@ impl StatementDistributionSubsystem { &relay_parent, &candidate_hash, )? { - return Err(NonFatal::RequestedUnannouncedCandidate( + return Err(JfyiError::RequestedUnannouncedCandidate( requesting_peer, candidate_hash, )) } let active_head = - active_heads.get(&relay_parent).ok_or(NonFatal::NoSuchHead(relay_parent))?; + active_heads.get(&relay_parent).ok_or(JfyiError::NoSuchHead(relay_parent))?; let committed = match active_head.waiting_large_statements.get(&candidate_hash) { Some(LargeStatementStatus::FetchedOrShared(committed)) => committed.clone(), _ => - return Err(NonFatal::NoSuchFetchedLargeStatement( + return Err(JfyiError::NoSuchFetchedLargeStatement( relay_parent, candidate_hash, )), }; - tx.send(committed).map_err(|_| NonFatal::ResponderGetDataCanceled)?; + tx.send(committed).map_err(|_| JfyiError::ResponderGetDataCanceled)?; }, } Ok(()) @@ -1670,7 +1671,7 @@ impl StatementDistributionSubsystem { active_heads: &mut HashMap, req_sender: &mpsc::Sender, message: RequesterMessage, - ) -> NonFatalResult<()> { + ) -> JfyiErrorResult<()> { match message { RequesterMessage::Finished { relay_parent, @@ -1686,7 +1687,7 @@ impl StatementDistributionSubsystem { let active_head = active_heads .get_mut(&relay_parent) - .ok_or(NonFatal::NoSuchHead(relay_parent))?; + .ok_or(JfyiError::NoSuchHead(relay_parent))?; let status = active_head.waiting_large_statements.remove(&candidate_hash); @@ -1697,7 +1698,7 @@ impl StatementDistributionSubsystem { return Ok(()) }, None => - return Err(NonFatal::NoSuchLargeStatementStatus( + return Err(JfyiError::NoSuchLargeStatementStatus( relay_parent, candidate_hash, )), @@ -1734,7 +1735,7 @@ impl StatementDistributionSubsystem { RequesterMessage::GetMorePeers { relay_parent, candidate_hash, tx } => { let active_head = active_heads .get_mut(&relay_parent) - .ok_or(NonFatal::NoSuchHead(relay_parent))?; + .ok_or(JfyiError::NoSuchHead(relay_parent))?; let status = active_head.waiting_large_statements.get_mut(&candidate_hash); @@ -1746,7 +1747,7 @@ impl StatementDistributionSubsystem { return Ok(()) }, None => - return Err(NonFatal::NoSuchLargeStatementStatus( + return Err(JfyiError::NoSuchLargeStatementStatus( relay_parent, candidate_hash, )), @@ -1836,7 +1837,7 @@ impl StatementDistributionSubsystem { .get_mut(&relay_parent) // This should never be out-of-sync with our view if the view // updates correspond to actual `StartWork` messages. - .ok_or(NonFatal::NoSuchHead(relay_parent))?; + .ok_or(JfyiError::NoSuchHead(relay_parent))?; active_head.waiting_large_statements.insert( statement.payload().candidate_hash(), LargeStatementStatus::FetchedOrShared(committed.clone()), @@ -1909,14 +1910,14 @@ fn requesting_peer_knows_about_candidate( requesting_peer: &PeerId, relay_parent: &Hash, candidate_hash: &CandidateHash, -) -> NonFatalResult { +) -> JfyiErrorResult { let peer_data = peers .get(requesting_peer) - .ok_or_else(|| NonFatal::NoSuchPeer(*requesting_peer))?; + .ok_or_else(|| JfyiError::NoSuchPeer(*requesting_peer))?; let knowledge = peer_data .view_knowledge .get(relay_parent) - .ok_or_else(|| NonFatal::NoSuchHead(*relay_parent))?; + .ok_or_else(|| JfyiError::NoSuchHead(*relay_parent))?; Ok(knowledge.sent_candidates.get(&candidate_hash).is_some()) } diff --git a/node/network/statement-distribution/src/responder.rs b/node/network/statement-distribution/src/responder.rs index 409e8a4d274c..74bcf679799b 100644 --- a/node/network/statement-distribution/src/responder.rs +++ b/node/network/statement-distribution/src/responder.rs @@ -20,9 +20,10 @@ use futures::{ SinkExt, StreamExt, }; +use fatality::Nested; use polkadot_node_network_protocol::{ request_response::{ - incoming::{self, OutgoingResponse}, + incoming::OutgoingResponse, v1::{StatementFetchingRequest, StatementFetchingResponse}, IncomingRequestReceiver, MAX_PARALLEL_STATEMENT_REQUESTS, }, @@ -74,16 +75,16 @@ pub async fn respond( pending_out.next().await; } - let req = match receiver.recv(|| vec![COST_INVALID_REQUEST]).await { - Err(incoming::Error::Fatal(f)) => { - tracing::debug!(target: LOG_TARGET, error = ?f, "Shutting down request responder"); + let req = match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() { + Ok(Ok(v)) => v, + Err(fatal) => { + tracing::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder"); return }, - Err(incoming::Error::NonFatal(err)) => { - tracing::debug!(target: LOG_TARGET, ?err, "Decoding request failed"); + Ok(Err(jfyi)) => { + tracing::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed"); continue }, - Ok(v) => v, }; let (tx, rx) = oneshot::channel(); diff --git a/node/subsystem-util/Cargo.toml b/node/subsystem-util/Cargo.toml index a9d3b69ac6bb..c3f335d4c152 100644 --- a/node/subsystem-util/Cargo.toml +++ b/node/subsystem-util/Cargo.toml @@ -13,6 +13,7 @@ parity-scale-codec = { version = "3.0.0", default-features = false, features = [ pin-project = "1.0.9" rand = "0.8.5" thiserror = "1.0.30" +fatality = "0.0.6" tracing = "0.1.31" derive_more = "0.99.17" lru = "0.7.2" @@ -38,4 +39,3 @@ log = "0.4.13" polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" } lazy_static = "1.4.0" polkadot-primitives-test-helpers = { path = "../../primitives/test-helpers" } - diff --git a/node/subsystem-util/src/runtime/error.rs b/node/subsystem-util/src/runtime/error.rs index 4cf169b48542..cfa532207953 100644 --- a/node/subsystem-util/src/runtime/error.rs +++ b/node/subsystem-util/src/runtime/error.rs @@ -18,34 +18,18 @@ //! Error handling related code and Error/Result definitions. use futures::channel::oneshot; -use thiserror::Error; use polkadot_node_subsystem::errors::RuntimeApiError; use polkadot_primitives::v1::SessionIndex; -pub type Result = std::result::Result; - -/// Errors for `Runtime` cache. -#[derive(Debug, Error, derive_more::From)] -#[error(transparent)] +#[allow(missing_docs)] +#[fatality::fatality(splitable)] pub enum Error { - /// All fatal errors. - Fatal(Fatal), - /// All nonfatal/potentially recoverable errors. - NonFatal(NonFatal), -} - -/// Fatal runtime errors. -#[derive(Debug, Error)] -pub enum Fatal { /// Runtime API subsystem is down, which means we're shutting down. + #[fatal] #[error("Runtime request got canceled")] RuntimeRequestCanceled(oneshot::Canceled), -} -/// Errors for fetching of runtime information. -#[derive(Debug, Error)] -pub enum NonFatal { /// Some request to the runtime failed. /// For example if we prune a block we're requesting info about. #[error("Runtime API error {0}")] @@ -56,13 +40,15 @@ pub enum NonFatal { NoSuchSession(SessionIndex), } +pub type Result = std::result::Result; + /// Receive a response from a runtime request and convert errors. pub(crate) async fn recv_runtime( r: oneshot::Receiver>, ) -> Result { let result = r .await - .map_err(Fatal::RuntimeRequestCanceled)? - .map_err(NonFatal::RuntimeRequest)?; + .map_err(FatalError::RuntimeRequestCanceled)? + .map_err(JfyiError::RuntimeRequest)?; Ok(result) } diff --git a/node/subsystem-util/src/runtime/mod.rs b/node/subsystem-util/src/runtime/mod.rs index d7afac0b58c2..46d33e639002 100644 --- a/node/subsystem-util/src/runtime/mod.rs +++ b/node/subsystem-util/src/runtime/mod.rs @@ -44,7 +44,7 @@ use crate::{ mod error; use error::{recv_runtime, Result}; -pub use error::{Error, Fatal, NonFatal}; +pub use error::{Error, FatalError, JfyiError}; /// Configuration for construction a `RuntimeInfo`. pub struct Config { @@ -169,7 +169,7 @@ impl RuntimeInfo { let session_info = recv_runtime(request_session_info(parent, session_index, sender).await) .await? - .ok_or(NonFatal::NoSuchSession(session_index))?; + .ok_or(JfyiError::NoSuchSession(session_index))?; let validator_info = self.get_validator_info(&session_info).await?; let full_info = ExtendedSessionInfo { session_info, validator_info }; diff --git a/tests/common.rs b/tests/common.rs index 0007bdc22066..7dd36f31d692 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -56,7 +56,7 @@ async fn wait_n_finalized_blocks_from(n: usize, url: &str) { let mut interval = tokio::time::interval(Duration::from_secs(6)); loop { - if let Ok(block) = get_finalized_head::(url.to_string()).await { + if let Ok(block) = get_finalized_head::(url).await { built_blocks.insert(block); if built_blocks.len() > n { break