Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f69de7e
chainHead: Ensure reasonable distance between leaf and finalized block
lexnv Mar 1, 2024
1afeaf8
chainHead: Introduce custom error for distance too large
lexnv Mar 1, 2024
6f02a08
chainHead: Temporarily suspend subscriptions
lexnv Mar 4, 2024
831f095
chainHead: Move suspending to subscription management
lexnv Mar 4, 2024
887d380
chainHead/subs/tests: Adjust testing to suspending changes
lexnv Mar 4, 2024
34a28ec
chainHead/subs/tests: Check subspended subscriptions
lexnv Mar 4, 2024
4da5073
chainHead/subs/tests: Simplify block production
lexnv Mar 4, 2024
e51b932
chainHead: Add config for suspended subscriptions
lexnv Mar 4, 2024
9024461
chainHead: Configure the lagging distance
lexnv Mar 4, 2024
e182600
chainHead/tests: Check suspension and lagging distance
lexnv Mar 4, 2024
53b80e8
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-edg…
lexnv Mar 15, 2024
0be419d
Update substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
lexnv Apr 2, 2024
7563d9a
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Apr 2, 2024
e9ae914
chainHead: Remove all active subscriptions instead of suspending time
lexnv Apr 2, 2024
e500f75
chainHead/tests: Adjust testing
lexnv Apr 2, 2024
ab5c218
Merge remote-tracking branch 'origin/lexnv/chainhead-edge-case-laggin…
lexnv Apr 2, 2024
5326ada
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-edg…
lexnv Apr 3, 2024
0e276cc
chainHead: Refactor master with reserved subscriptions
lexnv Apr 3, 2024
f2cbb64
chainHead/tests: Adjust testing
lexnv Apr 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chainHead/tests: Adjust testing
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Apr 2, 2024
commit e500f753809845e38c6e7935b81d9fe5efcde35f
30 changes: 13 additions & 17 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,9 @@ pub struct ChainHeadConfig {
pub subscription_max_pinned_duration: Duration,
/// The maximum number of ongoing operations per subscription.
pub subscription_max_ongoing_operations: usize,
/// Suspend the subscriptions if the distance between the leaves and the current finalized
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
///
/// Subscriptions are suspended for the `suspended_duration`.
pub suspend_on_lagging_distance: usize,
pub max_lagging_distance: usize,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
pub operation_max_storage_items: usize,
Expand All @@ -91,19 +89,17 @@ const MAX_ONGOING_OPERATIONS: usize = 16;
/// before paginations is required.
const MAX_STORAGE_ITER_ITEMS: usize = 5;

/// Suspend the subscriptions if the distance between the leaves and the current finalized
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
///
/// Subscriptions are suspended for the `suspended_duration`.
const SUSPEND_ON_LAGGING_DISTANCE: usize = 128;
const MAX_LAGGING_DISTANCE: usize = 128;

impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
suspend_on_lagging_distance: SUSPEND_ON_LAGGING_DISTANCE,
max_lagging_distance: MAX_LAGGING_DISTANCE,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
}
}
Expand All @@ -122,9 +118,9 @@ pub struct ChainHead<BE: Backend<Block>, Block: BlockT, Client> {
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
operation_max_storage_items: usize,
/// Suspend the subscriptions if the distance between the leaves and the current finalized
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
suspend_on_lagging_distance: usize,
max_lagging_distance: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -148,7 +144,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
backend,
)),
operation_max_storage_items: config.operation_max_storage_items,
suspend_on_lagging_distance: config.suspend_on_lagging_distance,
max_lagging_distance: config.max_lagging_distance,
_phantom: PhantomData,
}
}
Expand Down Expand Up @@ -196,7 +192,7 @@ where
let subscriptions = self.subscriptions.clone();
let backend = self.backend.clone();
let client = self.client.clone();
let suspend_on_lagging_distance = self.suspend_on_lagging_distance;
let max_lagging_distance = self.max_lagging_distance;

let fut = async move {
let Ok(sink) = pending.accept().await else { return };
Expand All @@ -207,8 +203,8 @@ where
let Some(sub_data) = subscriptions.insert_subscription(sub_id.clone(), with_runtime)
else {
// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
// subscription ID; or subscriptions are suspended.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted or suspended", sub_id);
// subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
return
Expand All @@ -221,12 +217,12 @@ where
subscriptions.clone(),
with_runtime,
sub_id.clone(),
suspend_on_lagging_distance,
max_lagging_distance,
);
let result = chain_head_follow.generate_events(sink, sub_data).await;

if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are suspended", sub_id);
debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are stopped", sub_id);
subscriptions.stop_all_subscriptions();
}

Expand Down
10 changes: 5 additions & 5 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ pub struct ChainHeadFollower<BE: Backend<Block>, Block: BlockT, Client> {
sub_id: String,
/// The best reported block by this subscription.
best_block_cache: Option<Block::Hash>,
/// Suspend the subscriptions if the distance between the leaves and the current finalized
/// Stop all subscriptions if the distance between the leaves and the current finalized
/// block is larger than this value.
suspend_on_lagging_distance: usize,
max_lagging_distance: usize,
}

impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Client> {
Expand All @@ -82,7 +82,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
sub_handle: Arc<SubscriptionManagement<Block, BE>>,
with_runtime: bool,
sub_id: String,
suspend_on_lagging_distance: usize,
max_lagging_distance: usize,
) -> Self {
Self {
client,
Expand All @@ -91,7 +91,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHeadFollower<BE, Block, Cli
with_runtime,
sub_id,
best_block_cache: None,
suspend_on_lagging_distance,
max_lagging_distance,
}
}
}
Expand Down Expand Up @@ -222,7 +222,7 @@ where
};

let distance: usize = block_num.saturating_sub(finalized_num).saturated_into();
if distance > self.suspend_on_lagging_distance {
if distance > self.max_lagging_distance {
return Err(SubscriptionManagementError::BlockDistanceTooLarge);
}

Expand Down
108 changes: 22 additions & 86 deletions substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,13 +1042,8 @@ mod tests {
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);

let mut subs = SubscriptionsInner::new(
10,
Duration::from_secs(10),
MAX_OPERATIONS_PER_SUB,
Duration::from_secs(10),
backend,
);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();

Expand Down Expand Up @@ -1087,13 +1082,8 @@ mod tests {
fn subscription_lock_block() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs = SubscriptionsInner::new(
10,
Duration::from_secs(10),
MAX_OPERATIONS_PER_SUB,
Duration::from_secs(10),
backend,
);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);

let id = "abc".to_string();
let hash = H256::random();
Expand Down Expand Up @@ -1124,13 +1114,8 @@ mod tests {
let hashes = produce_blocks(client, 1);
let hash = hashes[0];

let mut subs = SubscriptionsInner::new(
10,
Duration::from_secs(10),
MAX_OPERATIONS_PER_SUB,
Duration::from_secs(10),
backend,
);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();

let _stop = subs.insert_subscription(id.clone(), true).unwrap();
Expand Down Expand Up @@ -1159,13 +1144,8 @@ mod tests {
let hashes = produce_blocks(client, 1);
let hash = hashes[0];

let mut subs = SubscriptionsInner::new(
10,
Duration::from_secs(10),
MAX_OPERATIONS_PER_SUB,
Duration::from_secs(10),
backend,
);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id = "abc".to_string();

let _stop = subs.insert_subscription(id.clone(), true).unwrap();
Expand Down Expand Up @@ -1207,13 +1187,8 @@ mod tests {
let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);

let mut subs = SubscriptionsInner::new(
10,
Duration::from_secs(10),
MAX_OPERATIONS_PER_SUB,
Duration::from_secs(10),
backend,
);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();

Expand Down Expand Up @@ -1252,13 +1227,8 @@ mod tests {
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);

// Maximum number of pinned blocks is 2.
let mut subs = SubscriptionsInner::new(
2,
Duration::from_secs(10),
MAX_OPERATIONS_PER_SUB,
Duration::from_secs(10),
backend,
);
let mut subs =
SubscriptionsInner::new(2, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();

Expand Down Expand Up @@ -1302,13 +1272,8 @@ mod tests {
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);

// Maximum number of pinned blocks is 2 and maximum pin duration is 5 second.
let mut subs = SubscriptionsInner::new(
2,
Duration::from_secs(5),
MAX_OPERATIONS_PER_SUB,
Duration::from_secs(10),
backend,
);
let mut subs =
SubscriptionsInner::new(2, Duration::from_secs(5), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();

Expand Down Expand Up @@ -1357,13 +1322,8 @@ mod tests {
fn subscription_check_stop_event() {
let builder = TestClientBuilder::new();
let backend = builder.backend();
let mut subs = SubscriptionsInner::new(
10,
Duration::from_secs(10),
MAX_OPERATIONS_PER_SUB,
Duration::from_secs(10),
backend,
);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);

let id = "abc".to_string();

Expand Down Expand Up @@ -1408,22 +1368,16 @@ mod tests {
}

#[test]
fn suspend_subscriptions() {
fn stop_all_subscriptions() {
let (backend, client) = init_backend();

let hashes = produce_blocks(client, 3);
let (hash_1, hash_2, hash_3) = (hashes[0], hashes[1], hashes[2]);

let mut subs = SubscriptionsInner::new(
10,
Duration::from_secs(10),
MAX_OPERATIONS_PER_SUB,
Duration::from_secs(3),
backend,
);
let mut subs =
SubscriptionsInner::new(10, Duration::from_secs(10), MAX_OPERATIONS_PER_SUB, backend);
let id_1 = "abc".to_string();
let id_2 = "abcd".to_string();
let id_3 = "abcde".to_string();

// Pin all blocks for the first subscription.
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
Expand All @@ -1441,26 +1395,8 @@ mod tests {
assert_eq!(*subs.global_blocks.get(&hash_3).unwrap(), 1);
assert_eq!(subs.global_blocks.len(), 3);

// Suspend all subscriptions.
assert!(!subs.suspend.is_suspended());
subs.suspend_subscriptions();
assert!(subs.suspend.is_suspended());

// A new subscription cannot be inserted while suspended.
let result = subs.insert_subscription(id_3.clone(), true);
assert!(result.is_none());

// Check reference count.
assert_eq!(subs.global_blocks.len(), 0);

// Sleep 5 seconds.
std::thread::sleep(std::time::Duration::from_secs(5));

assert!(!subs.suspend.is_suspended());

// Subscriptions can be inserted again.
let _stop = subs.insert_subscription(id_1.clone(), true).unwrap();
let _stop = subs.insert_subscription(id_2.clone(), true).unwrap();
let _stop = subs.insert_subscription(id_3.clone(), true).unwrap();
// Stop all active subscriptions.
subs.stop_all_subscriptions();
assert!(subs.global_blocks.is_empty());
}
}
Loading