Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
977da03
Increase delay for pov-recovery
skunert Jan 25, 2023
dbef277
Update client/service/src/lib.rs
skunert Jan 25, 2023
72e7646
Comment
skunert Jan 25, 2023
190b012
FMT
skunert Jan 27, 2023
2278719
Merge branch 'master' into skunert/pov-recovery-increase-delay
skunert Jan 27, 2023
b63782f
Clear waiting_recovery when block is recovered or recovery failed
skunert Jan 31, 2023
3c660da
Introduce recovery queue that preserved insertion order
skunert Jan 31, 2023
8eaec6d
Better error logs
skunert Jan 31, 2023
8da6d8c
Decrease slot duration
skunert Feb 1, 2023
993dba5
Style improvements
skunert Feb 2, 2023
bfbae02
Add option to use unordered queue
skunert Feb 2, 2023
1139b80
Maintain cache of finalized blocks
skunert Feb 3, 2023
18e63a5
Wait for one relay chain slot before recovery
skunert Feb 3, 2023
c3ae356
Make retries testable
skunert Feb 3, 2023
508bee6
fmt
skunert Feb 3, 2023
d40ff11
Improve docs
skunert Feb 3, 2023
be9d523
Improve docs
skunert Feb 6, 2023
2bf0824
Simplify RecoveryQueue
skunert Feb 6, 2023
f744bec
Remove unwanted changes
skunert Feb 6, 2023
92c582b
Merge branch 'master' into skunert/experimental-pov-changes
skunert Feb 6, 2023
2512d09
Adjust to comments
skunert Feb 6, 2023
2af35d9
Apply suggestions from code review
skunert Feb 7, 2023
70ab63b
Move recovery delay into the queue
skunert Feb 8, 2023
0da8638
Check for finalized number
skunert Feb 8, 2023
6f0d8ef
Clean up
skunert Feb 8, 2023
3809eed
Use timer
skunert Feb 9, 2023
5bf500a
Simplify implementation
skunert Feb 8, 2023
49710ce
Revert "Use timer"
skunert Feb 9, 2023
78ccb2a
Properly clear `to_recover` flag
skunert Feb 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Maintain cache of finalized blocks
  • Loading branch information
skunert committed Feb 3, 2023
commit 1139b80dd355a1d902c6d45ce26d452aa8bdf754
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch =
cumulus-primitives-core = { path = "../../../primitives/core" }
cumulus-relay-chain-interface = { path = "../../relay-chain-interface" }
cumulus-client-pov-recovery = { path = "../../pov-recovery" }
schnellru = "0.2.1"

[dev-dependencies]
futures-timer = "3.0.2"
Expand Down
158 changes: 110 additions & 48 deletions client/consensus/common/src/parachain_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, UsageProvider,
};
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use schnellru::{ByLength, LruMap};
use sp_blockchain::Error as ClientError;
use sp_consensus::{BlockOrigin, BlockStatus};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
Expand All @@ -41,69 +42,130 @@ const LOG_TARGET: &str = "cumulus-consensus";
const RECOVERY_DELAY: RecoveryDelay =
RecoveryDelay { min: Duration::ZERO, max: Duration::from_secs(30) };

fn handle_new_finalized_head<P, Block, B>(
parachain: &Arc<P>,
finalized_head: Vec<u8>,
last_seen_finalized_hashes: &mut LruMap<Block::Hash, ()>,
) where
Block: BlockT,
B: Backend<Block>,
P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
{
let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
return
},
};

let hash = header.hash();

last_seen_finalized_hashes.insert(hash, ());
// don't finalize the same block multiple times.
if parachain.usage_info().chain.finalized_hash != hash {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Attempting to finalize header.",
);
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
}
}
}
}

/// Follow the finalized head of the given parachain.
///
/// For every finalized block of the relay chain, it will get the included parachain header
/// corresponding to `para_id` and will finalize it in the parachain.
async fn follow_finalized_head<P, Block, B, R>(para_id: ParaId, parachain: Arc<P>, relay_chain: R)
where
Block: BlockT,
P: Finalizer<Block, B> + UsageProvider<Block>,
P: Finalizer<Block, B> + UsageProvider<Block> + BlockchainEvents<Block>,
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let finalized_heads = match finalized_heads(relay_chain, para_id).await {
Ok(finalized_heads_stream) => finalized_heads_stream,
Ok(finalized_heads_stream) => finalized_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve finalized heads stream.");
return
},
};

let mut imported_blocks = parachain.import_notification_stream().fuse();

pin_mut!(finalized_heads);

loop {
let finalized_head = if let Some(h) = finalized_heads.next().await {
h
} else {
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
};
// We use this cache to finalize blocks that are imported late.
// For example, a block that has been recovered via PoV-Recovery
// on a full node can have several minutes delay. With this cache
// we have some "memory" of recently finalized blocks.
let mut last_seen_finalized_hashes = LruMap::new(ByLength::new(40));

let header = match Block::Header::decode(&mut &finalized_head[..]) {
Ok(header) => header,
Err(err) => {
tracing::debug!(
target: LOG_TARGET,
error = ?err,
"Could not decode parachain header while following finalized heads.",
);
continue
loop {
select! {
fin = finalized_heads.next() => {
match fin {
Some(finalized_head) =>
handle_new_finalized_head(&parachain, finalized_head, &mut last_seen_finalized_hashes),
None => {
tracing::debug!(target: LOG_TARGET, "Stopping following finalized head.");
return
}
}
},
};

let hash = header.hash();

// don't finalize the same block multiple times.
if parachain.usage_info().chain.finalized_hash != hash {
tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Attempting to finalize header.",
);
if let Err(e) = parachain.finalize_block(hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?hash,
"Failed to finalize block",
),
imported = imported_blocks.next() => {
match imported {
Some(imported_block) => {
// When we see a block import that is already finalized, we immediately finalize it.
if last_seen_finalized_hashes.peek(&imported_block.hash).is_some() {
tracing::debug!(
target: LOG_TARGET,
"Setting newly imported block as finalized.",
);

if let Err(e) = parachain.finalize_block(imported_block.hash, None, true) {
match e {
ClientError::UnknownBlock(_) => tracing::debug!(
target: LOG_TARGET,
block_hash = ?imported_block.hash,
"Could not finalize block because it is unknown.",
),
_ => tracing::warn!(
target: LOG_TARGET,
error = ?e,
block_hash = ?imported_block.hash,
"Failed to finalize block",
),
}
}
}
},
None => {
tracing::debug!(
target: LOG_TARGET,
"Stopping following imported blocks.",
);
return
}
}
}
}
Expand Down Expand Up @@ -171,15 +233,15 @@ async fn follow_new_best<P, R, Block, B>(
R: RelayChainInterface + Clone,
B: Backend<Block>,
{
let new_best_relay_heads = match new_best_heads(relay_chain, para_id).await {
let new_best_heads = match new_best_heads(relay_chain, para_id).await {
Ok(best_relay_heads_stream) => best_relay_heads_stream.fuse(),
Err(err) => {
tracing::error!(target: LOG_TARGET, error = ?err, "Unable to retrieve best heads stream.");
return
},
};

pin_mut!(new_best_relay_heads);
pin_mut!(new_best_heads);

let mut imported_blocks = parachain.import_notification_stream().fuse();
// The unset best header of the parachain. Will be `Some(_)` when we have imported a relay chain
Expand All @@ -189,10 +251,10 @@ async fn follow_new_best<P, R, Block, B>(

loop {
select! {
relay_header = new_best_relay_heads.next() => {
match relay_header {
Some(relay_header) => handle_new_best_parachain_head(
relay_header,
h = new_best_heads.next() => {
match h {
Some(header) => handle_new_best_parachain_head(
header,
&*parachain,
&mut unset_best_header,
recovery_chan_tx.as_mut(),
Expand Down
2 changes: 1 addition & 1 deletion test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ where
para_id,
relay_chain_interface,
import_queue: import_queue_service,
relay_chain_slot_duration: Duration::from_millis(600),
relay_chain_slot_duration: Duration::from_secs(6),
};

start_full_node(params)?;
Expand Down
8 changes: 4 additions & 4 deletions zombienet/tests/0002-pov_recovery.feature
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ charlie: is up within 60 seconds
one: is up within 60 seconds
two: is up within 60 seconds

# wait 30 blocks and register parachain
validator-3: reports block height is at least 30 within 250 seconds
# wait 20 blocks and register parachain
validator-3: reports block height is at least 20 within 250 seconds
validator-0: js-script ./register-para.js with "2000" within 240 seconds
validator-0: parachain 2000 is registered within 300 seconds

# check block production
bob: reports block height is at least 20 within 600 seconds
alice: reports block height is at least 20 within 600 seconds
charlie: reports block height is at least 20 within 600 seconds
one: reports block height is at least 20 within 600 seconds
two: reports block height is at least 20 within 600 seconds
one: reports block height is at least 20 within 800 seconds
two: reports block height is at least 20 within 800 seconds