Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
f69de7e
chainHead: Ensure reasonable distance between leaf and finalized block
lexnv Mar 1, 2024
1afeaf8
chainHead: Introduce custom error for distance too large
lexnv Mar 1, 2024
6f02a08
chainHead: Temporarily suspend subscriptions
lexnv Mar 4, 2024
831f095
chainHead: Move suspending to subscription management
lexnv Mar 4, 2024
887d380
chainHead/subs/tests: Adjust testing to suspending changes
lexnv Mar 4, 2024
34a28ec
chainHead/subs/tests: Check subspended subscriptions
lexnv Mar 4, 2024
4da5073
chainHead/subs/tests: Simplify block production
lexnv Mar 4, 2024
e51b932
chainHead: Add config for suspended subscriptions
lexnv Mar 4, 2024
9024461
chainHead: Configure the lagging distance
lexnv Mar 4, 2024
e182600
chainHead/tests: Check suspension and lagging distance
lexnv Mar 4, 2024
53b80e8
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-edg…
lexnv Mar 15, 2024
0be419d
Update substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
lexnv Apr 2, 2024
7563d9a
Update substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
lexnv Apr 2, 2024
e9ae914
chainHead: Remove all active subscriptions instead of suspending time
lexnv Apr 2, 2024
e500f75
chainHead/tests: Adjust testing
lexnv Apr 2, 2024
ab5c218
Merge remote-tracking branch 'origin/lexnv/chainhead-edge-case-laggin…
lexnv Apr 2, 2024
5326ada
Merge remote-tracking branch 'origin/master' into lexnv/chainhead-edg…
lexnv Apr 3, 2024
0e276cc
chainHead: Refactor master with reserved subscriptions
lexnv Apr 3, 2024
f2cbb64
chainHead/tests: Adjust testing
lexnv Apr 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
chainHead: Remove all active subscriptions instead of suspending time
Signed-off-by: Alexandru Vasile <alexandru.vasile@parity.io>
  • Loading branch information
lexnv committed Apr 2, 2024
commit e9ae91480ab9d4fb3baf67f7c0822fe8af7ba264
15 changes: 1 addition & 14 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ pub struct ChainHeadConfig {
///
/// Subscriptions are suspended for the `suspended_duration`.
pub suspend_on_lagging_distance: usize,
/// The amount of time for which the subscriptions are suspended.
///
/// Subscriptions are suspended when the distance between any leaf
/// and the finalized block is too large.
pub suspended_duration: Duration,
/// The maximum number of items reported by the `chainHead_storage` before
/// pagination is required.
pub operation_max_storage_items: usize,
Expand Down Expand Up @@ -102,19 +97,12 @@ const MAX_STORAGE_ITER_ITEMS: usize = 5;
/// Subscriptions are suspended for the `suspended_duration`.
const SUSPEND_ON_LAGGING_DISTANCE: usize = 128;

/// The amount of time for which the subscriptions are suspended.
///
/// Subscriptions are suspended when the distance between any leaf
/// and the finalized block is too large.
const SUSPENDED_DURATION: Duration = Duration::from_secs(30);

impl Default for ChainHeadConfig {
fn default() -> Self {
ChainHeadConfig {
global_max_pinned_blocks: MAX_PINNED_BLOCKS,
subscription_max_pinned_duration: MAX_PINNED_DURATION,
subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS,
suspended_duration: SUSPENDED_DURATION,
suspend_on_lagging_distance: SUSPEND_ON_LAGGING_DISTANCE,
operation_max_storage_items: MAX_STORAGE_ITER_ITEMS,
}
Expand Down Expand Up @@ -157,7 +145,6 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
config.global_max_pinned_blocks,
config.subscription_max_pinned_duration,
config.subscription_max_ongoing_operations,
config.suspended_duration,
backend,
)),
operation_max_storage_items: config.operation_max_storage_items,
Expand Down Expand Up @@ -240,7 +227,7 @@ where

if let Err(SubscriptionManagementError::BlockDistanceTooLarge) = result {
debug!(target: LOG_TARGET, "[follow][id={:?}] All subscriptions are suspended", sub_id);
subscriptions.suspend_subscriptions();
subscriptions.stop_all_subscriptions();
}

subscriptions.remove_subscription(&sub_id);
Expand Down
21 changes: 2 additions & 19 deletions substrate/client/rpc-spec-v2/src/chain_head/subscription/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ use std::{
time::{Duration, Instant},
};

use crate::chain_head::{
chain_head::LOG_TARGET,
subscription::{suspend::SuspendSubscriptions, SubscriptionManagementError},
FollowEvent,
};
use crate::chain_head::{subscription::SubscriptionManagementError, FollowEvent};

/// The queue size after which the `sc_utils::mpsc::tracing_unbounded` would produce warnings.
const QUEUE_SIZE_WARNING: usize = 512;
Expand Down Expand Up @@ -564,8 +560,6 @@ pub struct SubscriptionsInner<Block: BlockT, BE: Backend<Block>> {
max_ongoing_operations: usize,
/// Map the subscription ID to internal details of the subscription.
subs: HashMap<String, SubscriptionState<Block>>,
/// Suspend subscriptions for a given amount of time.
suspend: SuspendSubscriptions,

/// Backend pinning / unpinning blocks.
///
Expand All @@ -579,7 +573,6 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
max_ongoing_operations: usize,
suspend_duration: Duration,
backend: Arc<BE>,
) -> Self {
SubscriptionsInner {
Expand All @@ -588,7 +581,6 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
local_max_pin_duration,
max_ongoing_operations,
subs: Default::default(),
suspend: SuspendSubscriptions::new(suspend_duration),
backend,
}
}
Expand All @@ -599,11 +591,6 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
sub_id: String,
with_runtime: bool,
) -> Option<InsertedSubscriptionData<Block>> {
if self.suspend.is_suspended() {
log::trace!(target: LOG_TARGET, "[id={:?}] Subscription already suspended", sub_id);
return None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm; is the point of removing this now that we will allow new subscriptions to be added any time, but stop them all regardless whenever generate_events says that the distance is too large? Whereas before we'd not allow new subscriptions to be added for the timeout period.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that makes sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! The changes def simplify things a fair bit so I'm in favour, and the block distance being too large is def an edge case anyways :)


if let Entry::Vacant(entry) = self.subs.entry(sub_id) {
let (tx_stop, rx_stop) = oneshot::channel();
let (response_sender, response_receiver) =
Expand Down Expand Up @@ -637,12 +624,8 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionsInner<Block, BE> {
}
}

/// Suspends all subscriptions for the given duration.
///
/// All active subscriptions are removed.
pub fn suspend_subscriptions(&mut self) {
self.suspend.suspend_subscriptions();

pub fn stop_all_subscriptions(&mut self) {
let to_remove: Vec<_> = self.subs.keys().map(|sub_id| sub_id.clone()).collect();

for sub_id in to_remove {
Expand Down
11 changes: 3 additions & 8 deletions substrate/client/rpc-spec-v2/src/chain_head/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::{sync::Arc, time::Duration};

mod error;
mod inner;
mod suspend;

use self::inner::SubscriptionsInner;

Expand All @@ -44,15 +43,13 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
global_max_pinned_blocks: usize,
local_max_pin_duration: Duration,
max_ongoing_operations: usize,
suspend_duration: Duration,
backend: Arc<BE>,
) -> Self {
SubscriptionManagement {
inner: RwLock::new(SubscriptionsInner::new(
global_max_pinned_blocks,
local_max_pin_duration,
max_ongoing_operations,
suspend_duration,
backend,
)),
}
Expand All @@ -78,15 +75,13 @@ impl<Block: BlockT, BE: Backend<Block>> SubscriptionManagement<Block, BE> {
inner.remove_subscription(sub_id)
}

/// Suspends all subscriptions for the given duration.
/// Stop all active subscriptions.
///
/// For all active subscriptions, the internal data is discarded, blocks are unpinned and the
/// `Stop` event will be generated.
///
/// For incoming subscriptions, only the `Stop` event is delivered.
pub fn suspend_subscriptions(&self) {
pub fn stop_all_subscriptions(&self) {
let mut inner = self.inner.write();
inner.suspend_subscriptions()
inner.stop_all_subscriptions()
}

/// The block is pinned in the backend only once when the block's hash is first encountered.
Expand Down

This file was deleted.