Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
ec1b613
rpc/chainhead: Test unpin for noncanonical prunned blocks
lexnv Feb 8, 2023
1236d59
rpc/tests: Ensure fork is not reported by the Finalized event
lexnv Feb 13, 2023
477029f
rpc/chainhead: Detect pruned forks to ignore from events
lexnv Feb 13, 2023
e3531a1
rpc/tests: Check unpin can be called on pruned hashes
lexnv Feb 13, 2023
5288cd9
Fix clippy
lexnv Feb 13, 2023
904840d
rpc/chain_head: Handle race with memory blocks and notifications
lexnv Feb 16, 2023
8dddb1e
rpc/chain_head: Add data config for the `follow` future
lexnv Feb 16, 2023
b9e8f6f
rpc/chain_head: Address feedback
lexnv Feb 16, 2023
a796034
rpc/chain_head: Move best block cache on the data config
lexnv Feb 16, 2023
1d0b4d2
rpc/chain_head: Send new events from the finalized stream
lexnv Feb 16, 2023
3bb66e4
rpc/chian_head: Report all pruned blocks
lexnv Feb 16, 2023
3847c74
rpc/chain_head: Move `chainHead_follow` logic on dedicated file
lexnv Feb 17, 2023
64876fa
rpc/chain_head: Delegate follow logic to `chain_head_follow`
lexnv Feb 17, 2023
f84c4ce
rpc/chain_head: Remove subscriptions on drop
lexnv Feb 17, 2023
908d1f2
rpc/tests: Ignore pruned blocks for a longer fork
lexnv Feb 17, 2023
01e98b0
rpc/tests: Check all pruned blocks are reported, not just stale heads
lexnv Feb 17, 2023
33f5f10
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pru…
lexnv Feb 17, 2023
eceeb92
rpc/tests: Remove println debug and fix indentation
lexnv Feb 17, 2023
b05f5fc
rpc/chain_head: Remove unnecessary trait bounds
lexnv Feb 17, 2023
425d6e7
rpc/chain_head: Add debug log for pruned forks
lexnv Feb 17, 2023
002acba
Revert "rpc/chain_head: Add debug log for pruned forks"
lexnv Feb 20, 2023
b585d8b
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pru…
lexnv Feb 21, 2023
04fbcff
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pru…
lexnv Feb 27, 2023
6a9c36e
Merge remote-tracking branch 'origin/master' into lexnv/chainhead_pru…
lexnv Feb 27, 2023
4140b5b
Adjust blockID for testing
lexnv Feb 27, 2023
aca8f3b
Update client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Mar 1, 2023
6429e05
rpc/chain_head: Rename `ChainHeadFollow` to `ChainHeadFollower`
lexnv Mar 1, 2023
d3dd4a4
rpc/chain_head: Remove subscriptions manually
lexnv Mar 1, 2023
3786aa8
rpc/chain_head: Improve log messages by adding subID and errors
lexnv Mar 1, 2023
c24cdc6
rpc/chain_head: Ensure `follow` stops sending events on first error
lexnv Mar 1, 2023
bf85692
rpc/chain_head: Use default constructor
lexnv Mar 1, 2023
5ba5cd2
rpc/chain_head: Add `StartupPoint` structure
lexnv Mar 1, 2023
f0a40f0
rpc/chain_head: Rename `in_memory_blocks`
lexnv Mar 1, 2023
47853cb
rpc/chain_head: Fix comment typo
lexnv Mar 1, 2023
708aa0e
rpc/chain_head: Keep unique blocks and remove itertools
lexnv Mar 2, 2023
49245dc
rpc/chain_head: Make sure `bestBlocks` events are generated in order
lexnv Mar 2, 2023
095a39d
rpc/chain_head: Maintain order of reported blocks
lexnv Mar 2, 2023
ce35d02
rpc/chain_head: Parent of finalized block could be unpinned
lexnv Mar 2, 2023
08d2344
rpc/chain_head: Fix warning
lexnv Mar 2, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tokio-stream = { version = "0.1", features = ["sync"] }
array-bytes = "4.1"
log = "0.4.17"
futures-util = { version = "0.3.19", default-features = false }
itertools = "0.10.3"

[dev-dependencies]
serde_json = "1.0"
Expand Down
133 changes: 100 additions & 33 deletions client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,30 @@ use futures::{
stream::{self, Stream, StreamExt},
};
use futures_util::future::Either;
use itertools::Itertools;
use jsonrpsee::{
core::{async_trait, RpcResult},
types::{SubscriptionEmptyError, SubscriptionId, SubscriptionResult},
SubscriptionSink,
};
use log::{debug, error};
use parking_lot::RwLock;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, CallExecutor, ChildInfo,
ExecutorProvider, FinalityNotification, StorageKey, StorageProvider,
};
use serde::Serialize;
use sp_api::CallApiAt;
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
Backend as BlockChainBackend, Error as BlockChainError, HashAndNumber, HeaderBackend,
HeaderMetadata,
};
use sp_core::{hexdisplay::HexDisplay, storage::well_known_keys, Bytes};
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, Header},
};
use std::{marker::PhantomData, sync::Arc};

use std::{collections::HashSet, marker::PhantomData, sync::Arc};
/// An API for chain head RPC calls.
pub struct ChainHead<BE, Block: BlockT, Client> {
/// Substrate client.
Expand Down Expand Up @@ -126,16 +128,15 @@ impl<BE, Block: BlockT, Client> ChainHead<BE, Block, Client> {
///
/// This includes the "Initialized" event followed by the in-memory
/// blocks via "NewBlock" and the "BestBlockChanged".
fn generate_initial_events<Block, BE, Client>(
fn generate_initial_events<Block, Client>(
client: &Arc<Client>,
backend: &Arc<BE>,
handle: &SubscriptionHandle<Block>,
runtime_updates: bool,
initial_blocks: Vec<(Block::Hash, Block::Hash)>,
) -> Result<Vec<FollowEvent<Block::Hash>>, SubscriptionManagementError>
where
Block: BlockT + 'static,
Block::Header: Unpin,
BE: Backend<Block> + 'static,
Client: HeaderBackend<Block> + CallApiAt<Block> + 'static,
{
// The initialized event is the first one sent.
Expand All @@ -155,7 +156,6 @@ where
runtime_updates,
});

let initial_blocks = get_initial_blocks(&backend, finalized_block_hash);
let mut in_memory_blocks = Vec::with_capacity(initial_blocks.len() + 1);

in_memory_blocks.push(initialized_event);
Expand Down Expand Up @@ -250,33 +250,71 @@ where
}
}

/// Internal representation of the initial blocks that should be
/// reported or ignored by the chainHead.
#[derive(Clone, Debug)]
struct InitialBlocks<Block: BlockT> {
/// Children of the latest finalized block, for which the `NewBlock`
/// event must be generated.
///
/// It is a tuple of (block hash, parent hash).
in_memory: Vec<(Block::Hash, Block::Hash)>,
/// Blocks that should not be reported as pruned by the `Finalized` event.
///
/// The substrate database will perform the pruning of height N at
/// the finalization N + 1. We could have the following block tree
/// when the user subscribes to the `unfollow` method:
/// [A] - [A1] - [A2] - [A3]
/// ^^ finalized
/// - [A1] - [B1]
///
/// When the A3 block is finalized, B1 is reported as pruned, however
/// B1 was never reported as `NewBlock` (and as such was never pinned).
/// This is because the `NewBlock` events are generated for children of
/// the finalized hash.
pruned_forks: HashSet<Block::Hash>,
}

/// Get the in-memory blocks of the client, starting from the provided finalized hash.
///
/// Returns a tuple of block hash with parent hash.
fn get_initial_blocks<BE, Block>(
fn get_initial_blocks_with_forks<Block, BE>(
backend: &Arc<BE>,
parent_hash: Block::Hash,
) -> Vec<(Block::Hash, Block::Hash)>
finalized: HashAndNumber<Block>,
) -> Result<InitialBlocks<Block>, SubscriptionManagementError>
where
Block: BlockT + 'static,
BE: Backend<Block> + 'static,
{
let mut result = Vec::new();
let mut next_hash = Vec::new();
next_hash.push(parent_hash);

while let Some(parent_hash) = next_hash.pop() {
let Ok(blocks) = backend.blockchain().children(parent_hash) else {
continue
};

for child_hash in blocks {
result.push((child_hash, parent_hash));
next_hash.push(child_hash);
let blockchain = backend.blockchain();
let leaves = blockchain
.leaves()
.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;
let finalized_number = finalized.number;
let finalized = finalized.hash;
let mut pruned_forks = HashSet::new();
let mut in_memory = Vec::new();
for leaf in leaves {
let tree_route = sp_blockchain::tree_route(blockchain, finalized, leaf)
.map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?;

// When the common block number is smaller than the current finalized
// block, the tree route is a fork that should be pruned.
let common = tree_route.common_block();
let number = common.number;
let blocks = tree_route.enacted().iter().map(|block| block.hash);
if number < finalized_number {
pruned_forks.extend(blocks);
} else if number == finalized_number {
// Ensure a `NewBlock` event is generated for all children of the
// finalized block. Describe the tree route as (child_node, parent_node)
let parents = std::iter::once(finalized).chain(blocks.clone());
in_memory.extend(blocks.zip(parents));
}
}

result
// Keep unique blocks.
let in_memory: Vec<_> = in_memory.into_iter().unique().collect();

Ok(InitialBlocks { in_memory, pruned_forks })
}

/// Submit the events from the provided stream to the RPC client
Expand Down Expand Up @@ -374,6 +412,7 @@ fn handle_finalized_blocks<Client, Block>(
client: &Arc<Client>,
handle: &SubscriptionHandle<Block>,
notification: FinalityNotification<Block>,
pruned_forks: &Arc<RwLock<HashSet<Block::Hash>>>,
) -> Result<(FollowEvent<Block::Hash>, Option<FollowEvent<Block::Hash>>), SubscriptionManagementError>
where
Block: BlockT + 'static,
Expand All @@ -389,12 +428,30 @@ where
let mut finalized_block_hashes = notification.tree_route.iter().cloned().collect::<Vec<_>>();
finalized_block_hashes.push(last_finalized);

let pruned_block_hashes: Vec<_> = notification.stale_heads.iter().cloned().collect();
// Report all pruned blocks from the notification that are not
// part of the `pruned_forks`.
let pruned_block_hashes: Vec<_> = {
let mut to_ignore = pruned_forks.write();

notification
.stale_heads
.iter()
.cloned()
.filter(|hash| {
if !to_ignore.contains(&hash) {
return true
}

let finalized_event = FollowEvent::Finalized(Finalized {
finalized_block_hashes,
pruned_block_hashes: pruned_block_hashes.clone(),
});
to_ignore.remove(&hash);
false
})
.collect()
};

let finalized_event =
FollowEvent::Finalized(Finalized { finalized_block_hashes, pruned_block_hashes });

let pruned_block_hashes: Vec<_> = notification.stale_heads.iter().cloned().collect();

let mut best_block_cache = handle.best_block_write();
match *best_block_cache {
Expand Down Expand Up @@ -485,6 +542,17 @@ where
};
debug!(target: "rpc-spec-v2", "[follow][id={:?}] Subscription accepted", sub_id);

// Get all block hashes from the node's memory.
let info = self.client.info();
let finalized = HashAndNumber { hash: info.finalized_hash, number: info.finalized_number };
let Ok(InitialBlocks { in_memory, pruned_forks}) = get_initial_blocks_with_forks(&self.backend, finalized) else {
// Note: This could be warp sync error.
debug!(target: "rpc-spec-v2", "[follow][id={:?}] Failed to get all in-memory blocks", sub_id);
let _ = sink.send(&FollowEvent::<Block::Hash>::Stop);
return Ok(())
};
let pruned_forks = Arc::new(RwLock::new(pruned_forks));

let client = self.client.clone();
let handle = sub_handle.clone();
let subscription_id = sub_id.clone();
Expand Down Expand Up @@ -513,7 +581,7 @@ where
.client
.finality_notification_stream()
.map(move |notification| {
match handle_finalized_blocks(&client, &handle, notification) {
match handle_finalized_blocks(&client, &handle, notification, &pruned_forks) {
Ok((finalized_event, None)) => stream::iter(vec![finalized_event]),
Ok((finalized_event, Some(best_block))) =>
stream::iter(vec![best_block, finalized_event]),
Expand All @@ -529,9 +597,8 @@ where
let merged = tokio_stream::StreamExt::merge(stream_import, stream_finalized);
let subscriptions = self.subscriptions.clone();
let client = self.client.clone();
let backend = self.backend.clone();
let fut = async move {
let Ok(initial_events) = generate_initial_events(&client, &backend, &sub_handle, runtime_updates) else {
let Ok(initial_events) = generate_initial_events(&client, &sub_handle, runtime_updates, in_memory) else {
// Stop the subscription if we exceeded the maximum number of blocks pinned.
debug!(target: "rpc-spec-v2", "[follow][id={:?}] Exceeded max pinned blocks from initial events", sub_id);
let _ = sink.send(&FollowEvent::<Block::Hash>::Stop);
Expand Down
105 changes: 105 additions & 0 deletions client/rpc-spec-v2/src/chain_head/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,4 +1019,109 @@ async fn follow_prune_best_block() {
pruned_block_hashes: vec![format!("{:?}", block_2_hash)],
});
assert_eq!(event, expected);

// Pruned hash can be unpinned.
let sub_id = sub.subscription_id();
let sub_id = serde_json::to_string(&sub_id).unwrap();
let hash = format!("{:?}", block_2_hash);
let _res: () = api.call("chainHead_unstable_unpin", [&sub_id, &hash]).await.unwrap();
}

#[tokio::test]
async fn follow_forks_pruned_block() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut client = Arc::new(builder.build());

let api = ChainHead::new(
client.clone(),
backend,
Arc::new(TaskExecutor::default()),
CHAIN_GENESIS,
MAX_PINNED_BLOCKS,
)
.into_rpc();

// Block tree before the subscription:
//
// finalized -> block 1 -> block 2
// ^^^ finalized
// -> block 1 -> block 3
//

let block_1 = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block_1.clone()).await.unwrap();

let block_2 = client.new_block(Default::default()).unwrap().build().unwrap().block;
let block_2_hash = block_2.header.hash();
client.import(BlockOrigin::Own, block_2.clone()).await.unwrap();

// Block 3 with parent Block 1 is not the best imported.
let mut block_builder = client
.new_block_at(&BlockId::Hash(block_1.header.hash()), Default::default(), false)
.unwrap();
// This push is required as otherwise block 3 has the same hash as block 2 and won't get
// imported
block_builder
.push_transfer(Transfer {
from: AccountKeyring::Alice.into(),
to: AccountKeyring::Ferdie.into(),
amount: 41,
nonce: 0,
})
.unwrap();
let block_3 = block_builder.build().unwrap().block;
client.import(BlockOrigin::Own, block_3.clone()).await.unwrap();

// Block 3 is not pruned, pruning happens at height (N - 1).
client.finalize_block(block_2_hash, None).unwrap();

let mut sub = api.subscribe("chainHead_unstable_follow", [false]).await.unwrap();

// Initialized must always be reported first.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
println!("Event: {:?}", event);
let expected = FollowEvent::Initialized(Initialized {
finalized_block_hash: format!("{:?}", block_2_hash),
finalized_block_runtime: None,
runtime_updates: false,
});
assert_eq!(event, expected);

// Block tree:
//
// finalized -> block 1 -> block 2 -> block 4
// ^^^ finalized
// -> block 1 -> block 3
//
// Mark block 4 as finalized to force block 3 to get pruned.

let block_4 = client.new_block(Default::default()).unwrap().build().unwrap().block;
let block_4_hash = block_4.header.hash();
client.import(BlockOrigin::Own, block_4.clone()).await.unwrap();

client.finalize_block(block_4_hash, None).unwrap();

// Check block 4.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::NewBlock(NewBlock {
block_hash: format!("{:?}", block_4_hash),
parent_block_hash: format!("{:?}", block_2_hash),
new_runtime: None,
runtime_updates: false,
});
assert_eq!(event, expected);
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::BestBlockChanged(BestBlockChanged {
best_block_hash: format!("{:?}", block_4_hash),
});
assert_eq!(event, expected);

// Block 3 must not be reported as pruned.
let event: FollowEvent<String> = get_next_event(&mut sub).await;
let expected = FollowEvent::Finalized(Finalized {
finalized_block_hashes: vec![format!("{:?}", block_4_hash)],
pruned_block_hashes: vec![],
});
assert_eq!(event, expected);
}