Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
3d7d222
Initial network interface preparations
skunert Dec 21, 2021
b23547c
Implement get_storage_by_key
skunert Jan 4, 2022
cfaa211
Implement `validators` and `session_index_for_child`
skunert Jan 6, 2022
af362b4
Implement persisted_validation_data and candidate_pending_availability
skunert Jan 7, 2022
a8d80d5
Fix method name for persisted_validation_data and add encoded params
skunert Jan 7, 2022
482ecd4
Implement `retrieve_dmq_contents` and `retrieve_all_inbound_hrmp_chan…
skunert Jan 10, 2022
c03ba54
Implement `prove_read`
skunert Jan 10, 2022
b4ca285
Introduce separate RPC client, expose JsonRpSee errors
skunert Jan 10, 2022
f0d6c18
Simplify closure in call_remote_runtime_function
skunert Jan 10, 2022
d4dada5
Implement import stream, upgrade JsonRpSee
skunert Jan 12, 2022
dc5de78
Implement finality stream
skunert Jan 12, 2022
2675a7d
Remove unused method from interface
skunert Jan 12, 2022
f9a228d
Implement `is_major_syncing`
skunert Jan 12, 2022
7a89c73
Implement `wait_on_block`
skunert Jan 13, 2022
b92f204
Merge branch 'master' into network-interface-master
skunert Jan 13, 2022
532ba1d
Fix tests
skunert Jan 13, 2022
d49e25b
Unify error handling `ApiError`
skunert Jan 13, 2022
051068f
Replace WaitError with RelayChainError
skunert Jan 13, 2022
4b664a7
Wrap BlockChainError in RelayChainError
skunert Jan 13, 2022
40432e8
Unify error handling in relay chain intefaces
skunert Jan 17, 2022
5e03777
Fix return type of proof method
skunert Jan 17, 2022
d916d69
Improve error handling of new methods
skunert Jan 17, 2022
ee60f31
Improve error handling and move logging outside of interface
skunert Jan 18, 2022
0a1dd44
Clean up
skunert Jan 18, 2022
68fa074
Remove unwanted changes, clean up
skunert Jan 18, 2022
2764ea2
Remove unused import
skunert Jan 18, 2022
3be8d65
Add format for StatemachineError and remove nused From trait
skunert Jan 19, 2022
4c9b8ea
Use 'thiserror' crate to simplify error handling
skunert Jan 24, 2022
f2caf4b
Expose error for overseer, further simplify error handling
skunert Jan 24, 2022
7c95a65
Merge branch 'master' into relay-chain-interface-error-handling
skunert Jan 24, 2022
af4c50c
Reintroduce network interface
skunert Jan 19, 2022
f0212f9
Implement cli option
skunert Jan 20, 2022
c4fd3ae
Adjust call_state method to use hashes
skunert Jan 24, 2022
3a5e69b
Merge branch 'master' into skunert-network-interface
skunert Jan 28, 2022
2bf24bb
Disable PoV recovery when RPC is used
skunert Jan 28, 2022
3a4766b
Add integration test for network full node
skunert Feb 1, 2022
156f453
Use Hash instead of BlockId to ensure compatibility with RPC interface
skunert Feb 1, 2022
b2f5163
Fix cargo check warnings
skunert Feb 1, 2022
bf9293e
Implement retries
skunert Feb 3, 2022
cc8862f
Remove `expect` statements from code
skunert Feb 3, 2022
99535a0
Update jsonrpsee to 0.8.0 and make collator keys optional
skunert Feb 4, 2022
fcbd51e
Make cli arguments conflicting
skunert Feb 4, 2022
31e2ebf
Remove unused `block_status` method
skunert Feb 4, 2022
277a584
Add clippy fixes
skunert Feb 4, 2022
7af8820
Merge branch 'master' into skunert-network-interface
skunert Feb 7, 2022
8bd395d
Cargo fmt
skunert Feb 7, 2022
389777f
Validate relay chain rpc url
skunert Feb 7, 2022
53324c5
Clean up dependencies and add one more integration test
skunert Feb 7, 2022
470ce99
Clean up
skunert Feb 7, 2022
e39bc18
Clean up dependencies of relay-chain-network
skunert Feb 7, 2022
8bd74d5
Use hash instead of blockid for rpc methods
skunert Feb 7, 2022
a968c70
Fix tests
skunert Feb 7, 2022
332013b
Update client/cli/src/lib.rs
skunert Feb 8, 2022
f61cd81
Improve error message of cli validation
skunert Feb 8, 2022
ce0bd74
Add rpc client constructor
skunert Feb 8, 2022
2140165
Do not use debug formatting for errors
skunert Feb 9, 2022
2862b22
Merge branch 'master' into skunert-network-interface
skunert Feb 9, 2022
6c88d44
Improve logging for remote runtime methods
skunert Feb 17, 2022
1dc0e01
Merge branch 'master' into skunert-network-interface
skunert Feb 17, 2022
874dfb7
Only retry on transport problems
skunert Feb 21, 2022
28df792
Use PHash by value, rename test
skunert Feb 21, 2022
b6236a1
Improve tracing, return error on relay-chain-interface build
skunert Feb 21, 2022
c763d46
Fix naming, use generics instead of deserializing manually
skunert Feb 21, 2022
c502ae8
Rename RelayChainLocal and RelayChainNetwork
skunert Feb 21, 2022
43eb056
lock
skunert Feb 21, 2022
f15097c
Merge branch 'master' into skunert-network-interface
skunert Feb 21, 2022
ac455e9
Format
skunert Feb 21, 2022
69e5a38
Use impl trait for encodable runtime payload
skunert Feb 28, 2022
8d851cf
Only instantiate full node in tests when we need it
skunert Feb 28, 2022
f6965a2
Merge branch 'master' into skunert-network-interface
skunert Feb 28, 2022
f9aa4f0
Upgrade scale-codec to 3.0.0
skunert Mar 1, 2022
3587259
Improve expect log
skunert Mar 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Improve error handling and move logging outside of interface
  • Loading branch information
skunert committed Jan 18, 2022
commit ee60f314409b5288e9ffe2dd6b9f03ba7309e031
48 changes: 32 additions & 16 deletions client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use async_trait::async_trait;
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
Expand All @@ -34,6 +34,8 @@ use futures::{select, FutureExt, Stream, StreamExt};

use std::{pin::Pin, sync::Arc};

const LOG_TARGET: &str = "cumulus-consensus";

/// Helper for the relay chain client. This is expected to be a lightweight handle like an `Arc`.
#[async_trait]
pub trait RelaychainClient: Clone + 'static {
Expand All @@ -44,10 +46,10 @@ pub trait RelaychainClient: Clone + 'static {
type HeadStream: Stream<Item = Vec<u8>> + Send + Unpin;

/// Get a stream of new best heads for the given parachain.
async fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream;
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;

/// Get a stream of finalized heads for the given parachain.
async fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream;
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream>;

/// Returns the parachain head for the given `para_id` at the given block id.
async fn parachain_head_at(
Expand All @@ -68,7 +70,13 @@ where
R: RelaychainClient,
B: Backend<Block>,
{
let mut finalized_heads = relay_chain.finalized_heads(para_id).await;
let mut finalized_heads = match relay_chain.finalized_heads(para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream,
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
Expand Down Expand Up @@ -167,7 +175,14 @@ async fn follow_new_best<P, R, Block, B>(
R: RelaychainClient,
B: Backend<Block>,
{
let mut new_best_heads = relay_chain.new_best_heads(para_id).await.fuse();
let mut new_best_heads = match relay_chain.new_best_heads(para_id).await {
Ok(best_heads_stream) => best_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
return
},
};

let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
// block before the parachain block it included. In this case we need to wait for this block to
Expand Down Expand Up @@ -379,13 +394,12 @@ where

type HeadStream = Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;

async fn new_best_heads(&self, para_id: ParaId) -> Self::HeadStream {
async fn new_best_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();

//TODO: Error handling
self.new_best_notification_stream()
.await
.expect("")
let new_best_notification_stream = self
.new_best_notification_stream()
.await?
.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move {
Expand All @@ -396,15 +410,16 @@ where
.flatten()
}
})
.boxed()
.boxed();
Ok(new_best_notification_stream)
}

async fn finalized_heads(&self, para_id: ParaId) -> Self::HeadStream {
async fn finalized_heads(&self, para_id: ParaId) -> RelayChainResult<Self::HeadStream> {
let relay_chain = self.clone();

self.finality_notification_stream()
.await
.expect("")
let finality_notification_stream = self
.finality_notification_stream()
.await?
.filter_map(move |n| {
let relay_chain = relay_chain.clone();
async move {
Expand All @@ -415,7 +430,8 @@ where
.flatten()
}
})
.boxed()
.boxed();
Ok(finality_notification_stream)
}

async fn parachain_head_at(
Expand Down
10 changes: 5 additions & 5 deletions client/consensus/common/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::*;

use async_trait::async_trait;
use codec::Encode;
use cumulus_relay_chain_interface::RelayChainError;
use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
use cumulus_test_client::{
runtime::{Block, Header},
Backend, Client, InitBlockBuilder, TestClientBuilder, TestClientBuilderExt,
Expand Down Expand Up @@ -74,7 +74,7 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {

type HeadStream = Box<dyn Stream<Item = Vec<u8>> + Send + Unpin>;

async fn new_best_heads(&self, _: ParaId) -> Self::HeadStream {
async fn new_best_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
let stream = self
.inner
.lock()
Expand All @@ -83,10 +83,10 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");

Box::new(stream.map(|v| v.encode()))
Ok(Box::new(stream.map(|v| v.encode())))
}

async fn finalized_heads(&self, _: ParaId) -> Self::HeadStream {
async fn finalized_heads(&self, _: ParaId) -> RelayChainResult<Self::HeadStream> {
let stream = self
.inner
.lock()
Expand All @@ -95,7 +95,7 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain {
.take()
.expect("Should only be called once");

Box::new(stream.map(|v| v.encode()))
Ok(Box::new(stream.map(|v| v.encode())))
}

async fn parachain_head_at(
Expand Down
2 changes: 1 addition & 1 deletion client/consensus/relay-chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
.propose(
inherent_data,
Default::default(),
//TODO: Fix this.
// TODO: Fix this.
Duration::from_millis(500),
// Set the block limit to 50% of the maximum PoV size.
//
Expand Down
4 changes: 2 additions & 2 deletions client/network/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl RelayChainInterface for DummyRelayChainInterface {
self.relay_backend
.blockchain()
.status(block_id)
.map_err(|err| RelayChainError::BlockchainError(err.to_string()))
.map_err(RelayChainError::BlockchainError)
}

async fn best_block_hash(&self) -> RelayChainResult<PHash> {
Expand Down Expand Up @@ -206,7 +206,7 @@ impl RelayChainInterface for DummyRelayChainInterface {
&self,
_: &polkadot_service::BlockId,
_: &Vec<Vec<u8>>,
) -> Result<Option<sc_client_api::StorageProof>, RelayChainError> {
) -> Result<sc_client_api::StorageProof, RelayChainError> {
unimplemented!("Not needed for test")
}

Expand Down
8 changes: 5 additions & 3 deletions client/relay-chain-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ pub enum RelayChainError {
)]
WaitBlockchainError(PHash, sp_blockchain::Error),
#[display(fmt = "Blockchain returned an error: {:?}", _0)]
BlockchainError(String),
BlockchainError(sp_blockchain::Error),
StateMachineError(String),
#[display(fmt = "Unspecified error occured: {:?}", _0)]
GenericError(String),
}
impl std::error::Error for RelayChainError {}

Expand Down Expand Up @@ -160,7 +162,7 @@ pub trait RelayChainInterface: Send + Sync {
&self,
block_id: &BlockId,
relevant_keys: &Vec<Vec<u8>>,
) -> Result<Option<StorageProof>, RelayChainError>;
) -> Result<StorageProof, RelayChainError>;
}

#[async_trait]
Expand Down Expand Up @@ -254,7 +256,7 @@ where
&self,
block_id: &BlockId,
relevant_keys: &Vec<Vec<u8>>,
) -> RelayChainResult<Option<StorageProof>> {
) -> RelayChainResult<StorageProof> {
(**self).prove_read(block_id, relevant_keys).await
}

Expand Down
66 changes: 12 additions & 54 deletions client/relay-chain-local/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use sp_consensus::SyncOracle;
use sp_core::{sp_std::collections::btree_map::BTreeMap, Pair};
use sp_state_machine::{Backend as StateBackend, StorageValue};

const LOG_TARGET: &str = "relay-chain-local";
/// The timeout in seconds after that the waiting for a block should be aborted.
const TIMEOUT_IN_SECONDS: u64 = 6;

Expand Down Expand Up @@ -100,15 +99,7 @@ where
sp_core::ExecutionContext::Importing,
para_id,
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the downward messages.",
);
RelayChainError::ApiError(e)
})
.map_err(RelayChainError::ApiError)
}

async fn retrieve_all_inbound_hrmp_channel_contents(
Expand All @@ -123,15 +114,7 @@ where
sp_core::ExecutionContext::Importing,
para_id,
)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the inbound HRMP messages.",
);
RelayChainError::ApiError(e)
})
.map_err(RelayChainError::ApiError)
}

async fn persisted_validation_data(
Expand Down Expand Up @@ -202,7 +185,7 @@ where
self.backend
.blockchain()
.status(block_id)
.map_err(|err| RelayChainError::BlockchainError(err.to_string()))
.map_err(RelayChainError::BlockchainError)
}

async fn is_major_syncing(&self) -> RelayChainResult<bool> {
Expand All @@ -219,45 +202,20 @@ where
block_id: &BlockId,
key: &[u8],
) -> Result<Option<StorageValue>, RelayChainError> {
let state = self
.backend
.state_at(*block_id)
.map_err(|err| RelayChainError::BlockchainError(err.to_string()))?;
state.storage(key).map_err(RelayChainError::BlockchainError)
let state = self.backend.state_at(*block_id).map_err(RelayChainError::BlockchainError)?;
state.storage(key).map_err(RelayChainError::GenericError)
}

async fn prove_read(
&self,
block_id: &BlockId,
relevant_keys: &Vec<Vec<u8>>,
) -> Result<Option<StorageProof>, RelayChainError> {
let state_backend = self
.backend
.state_at(*block_id)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?block_id,
error = ?e,
"Cannot obtain the state of the relay chain.",
);
})
.ok();

match state_backend {
Some(state) => sp_state_machine::prove_read(state, relevant_keys)
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?block_id,
error = ?e,
"Failed to collect required relay chain state storage proof.",
);
RelayChainError::StateMachineError(e.to_string())
})
.map(Some),
None => Ok(None),
}
) -> Result<StorageProof, RelayChainError> {
let state_backend =
self.backend.state_at(*block_id).map_err(RelayChainError::BlockchainError)?;

sp_state_machine::prove_read(state_backend, relevant_keys)
.map_err(|e| RelayChainError::StateMachineError(e.to_string()))
}

/// Wait for a given relay chain block in an async way.
Expand Down Expand Up @@ -337,7 +295,7 @@ where
let block_id = BlockId::Hash(hash);
match backend.blockchain().status(block_id) {
Ok(BlockStatus::InChain) => return Ok(BlockCheckStatus::InChain),
Err(err) => return Err(RelayChainError::BlockchainError(err.to_string())),
Err(err) => return Err(RelayChainError::BlockchainError(err)),
_ => {},
}

Expand Down
2 changes: 0 additions & 2 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ where
.spawn_essential_handle()
.spawn("cumulus-consensus", None, consensus);

//TODO: error handling
let pov_recovery = cumulus_client_pov_recovery::PoVRecovery::new(
relay_chain_interface
.overseer_handle()
Expand All @@ -125,7 +124,6 @@ where
.spawn_essential_handle()
.spawn("cumulus-pov-recovery", None, pov_recovery.run());

//TODO: error handling
cumulus_client_collator::start_collator(cumulus_client_collator::StartCollatorParams {
runtime_api: client.clone(),
block_status,
Expand Down
2 changes: 1 addition & 1 deletion pallets/parachain-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ pub mod pallet {
#[pallet::genesis_build]
impl<T: Config> GenesisBuild<T> for GenesisConfig {
fn build(&self) {
//TODO: Remove after https://github.com/paritytech/cumulus/issues/479
// TODO: Remove after https://github.com/paritytech/cumulus/issues/479
sp_io::storage::set(b":c", &[]);
}
}
Expand Down
33 changes: 29 additions & 4 deletions primitives/parachain-inherent/src/client_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,15 @@ async fn collect_relay_storage_proof(
relay_chain_interface
.prove_read(&relay_parent_block_id, &relevant_keys)
.await
.ok()?
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent_block_id,
error = ?e,
"Cannot obtain read proof from relay chain.",
);
})
.ok()
}

impl ParachainInherentData {
Expand All @@ -129,12 +137,29 @@ impl ParachainInherentData {
let relay_chain_state =
collect_relay_storage_proof(&relay_chain_interface, para_id, relay_parent).await?;

//TODO: error handling
let downward_messages =
relay_chain_interface.retrieve_dmq_contents(para_id, relay_parent).await.ok()?;
let downward_messages = relay_chain_interface
.retrieve_dmq_contents(para_id, relay_parent)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the downward messages.",
);
})
.ok()?;
let horizontal_messages = relay_chain_interface
.retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent)
.await
.map_err(|e| {
tracing::error!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
error = ?e,
"An error occured during requesting the inbound HRMP messages.",
);
})
.ok()?;

Some(ParachainInherentData {
Expand Down