Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub type ImportNotifications<Block> = mpsc::UnboundedReceiver<BlockImportNotific
/// A stream of block finality notifications.
pub type FinalityNotifications<Block> = mpsc::UnboundedReceiver<FinalityNotification<Block>>;

/// A stream of block import events.
pub type AllBlocksNotifications<Block> = mpsc::UnboundedReceiver<AllBlocksNotification<Block>>;

/// Expected hashes of blocks at given heights.
///
/// This may be used as chain spec extension to set trusted checkpoints, i.e.
Expand Down Expand Up @@ -64,6 +67,13 @@ pub trait BlockchainEvents<Block: BlockT> {
/// finalized block.
fn finality_notification_stream(&self) -> FinalityNotifications<Block>;

/// Get a block import stream of notification. Guaranteed to be fired for every
/// imported block even when doing a major sync.
///
/// Use cautiously, cause it may have a significant impact on performance. Note that the events are being
/// sent using unbounded channel, so make sure to process them as soon as possible.
fn all_blocks_notification_stream(&self) -> AllBlocksNotifications<Block>;

/// Get storage changes event stream.
///
/// Passing `None` as `filter_keys` subscribes to all storage changes.
Expand Down Expand Up @@ -242,3 +252,19 @@ pub struct FinalityNotification<Block: BlockT> {
/// Imported block header.
pub header: Block::Header,
}


/// Summary of an imported block
#[derive(Clone, Debug)]
pub struct AllBlocksNotification<Block: BlockT> {
/// Determines if this block notification is part
/// of an initial sync of the node with the network.
/// Imported block header hash.
pub hash: Block::Hash,
/// Imported block origin.
pub origin: BlockOrigin,
/// Imported block header.
pub header: Block::Header,
/// Is this the new best block.
pub is_new_best: bool,
}
16 changes: 8 additions & 8 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,23 +916,23 @@ ServiceBuilder<
let is_validator = config.roles.is_authority();

let (import_stream, finality_stream) = (
client.import_notification_stream().map(|n| ChainEvent::NewBlock {
id: BlockId::Hash(n.hash),
header: n.header,
retracted: n.retracted,
is_new_best: n.is_new_best,
}),
client.all_blocks_notification_stream()
.map(|n| ChainEvent::BlockReceived {
header: n.header,
is_initial_sync: n.origin == BlockOrigin::NetworkInitialSync,
is_new_best: n.is_new_best,
}),
client.finality_notification_stream().map(|n| ChainEvent::Finalized {
hash: n.hash
})
);
let events = futures::stream::select(import_stream, finality_stream)
.for_each(move |event| {
// offchain worker is only interested in block import events
if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event {
if let ChainEvent::BlockReceived { ref header, is_new_best, is_initial_sync, .. } = event {
let offchain = offchain.as_ref().and_then(|o| o.upgrade());
match offchain {
Some(offchain) if is_new_best => {
Some(offchain) if is_new_best && !is_initial_sync => {
notifications_spawn_handle.spawn(
"offchain-on-block",
offchain.on_block_imported(
Expand Down
47 changes: 39 additions & 8 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub struct Client<B, E, Block, RA> where Block: BlockT {
storage_notifications: Mutex<StorageNotifications<Block>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification<Block>>>>,
finality_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<FinalityNotification<Block>>>>,
all_blocks_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<AllBlocksNotification<Block>>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quite hesitant on adding another Unbounded channel here. This may become very unreliable, imagine a case where:

  1. You have some non-trivial work to do on every block
  2. The channel queues up a bunch of blocks, as you are not as fast processing them.
  3. The state get's pruned in the meantime.

The whole scheme is then useless, cause even when we receive the notification the state is already lost. Also if the node is turned off the whole content of the channel is lost, so we will essentially miss running for some blocks.

If we want to make it super reliable and make sure that EACH and EVERY block is indexed/processed we need to come up with a more sophisticated mechanism.
I imagine something like LMAX-Disruptor scheme, where we:

  1. Persist the state of indexing in some DB
  2. Pin the state, up until the slower consumer processes it.
  3. Proceed as fast as possible with fast consumers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm quite hesitant on adding another Unbounded channel here.

Same here.

// holds the block hash currently being imported. TODO: replace this with block queue
importing_block: RwLock<Option<Block::Hash>>,
block_rules: BlockRules<Block>,
Expand Down Expand Up @@ -279,6 +280,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
storage_notifications: Default::default(),
import_notification_sinks: Default::default(),
finality_notification_sinks: Default::default(),
all_blocks_notification_sinks: Default::default(),
importing_block: Default::default(),
block_rules: BlockRules::new(fork_blocks, bad_blocks),
execution_extensions,
Expand Down Expand Up @@ -742,21 +744,30 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where

operation.op.insert_aux(aux)?;


let header = import_headers.into_post();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth to construct this lazily for all_blocks. Also I'd consider passing less data to all_blocks notification for performance reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing less data is imho not possible since header is required in https://github.com/paritytech/substrate/pull/5455/files#diff-240c17a1e4dbce4ad137b7777bf03ce4R919-R924 which in turn requires header to be extracted from import_headers - correct me if I am wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case there can be many all-blocks-notification-streams one could pass an Arc<AllBlocksNotification> down the channels instead of a clone for each channel within notify_any_block_imported.

We do similar things for the NetworkService::event_stream where we pass a Bytes instead of a Vec<u8>.

let import_summary = ImportSummary {
hash,
origin,
header,
is_new_best,
storage_changes,
retracted,
};

self.notify_any_block_imported(
&import_summary
)?;

if make_notifications {
if finalized {
operation.notify_finalized.push(hash);
}

operation.notify_imported = Some(ImportSummary {
hash,
origin,
header: import_headers.into_post(),
is_new_best,
storage_changes,
retracted,
})
operation.notify_imported = Some(import_summary);
}


Ok(ImportResult::imported(is_new_best))
}

Expand Down Expand Up @@ -971,6 +982,20 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where
Ok(())
}

fn notify_any_block_imported(&self, notify_import: &ImportSummary<Block>) -> sp_blockchain::Result<()> {
let notification = AllBlocksNotification::<Block> {
hash: notify_import.hash.clone(),
origin: notify_import.origin.clone(),
header: notify_import.header.clone(),
is_new_best: notify_import.is_new_best,
};

self.all_blocks_notification_sinks.lock()
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());

Ok(())
}

/// Attempts to revert the chain by `n` blocks guaranteeing that no block is
/// reverted past the last finalized block. Returns the number of blocks
/// that were successfully reverted.
Expand Down Expand Up @@ -1775,6 +1800,12 @@ where
stream
}

fn all_blocks_notification_stream(&self) -> AllBlocksNotifications<Block> {
let (sink, stream) = mpsc::unbounded();
self.all_blocks_notification_sinks.lock().push(sink);
stream
}

/// Get storage changes event stream.
fn storage_changes_notification_stream(
&self,
Expand Down
4 changes: 4 additions & 0 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,10 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
}
}.boxed()
}
ChainEvent::BlockReceived{ .. } => {
// nop, except for initial block sync, NewBlock handles this case
Box::pin(ready(()))
}
}
}
}
11 changes: 11 additions & 0 deletions primitives/transaction-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,17 @@ pub enum ChainEvent<B: BlockT> {
/// List of retracted blocks ordered by block number.
retracted: Vec<B::Hash>,
},

/// A block has been received and is added to the chain
BlockReceived {
/// Header of the just imported block.
header: B::Header,
/// Is this the new best block.
is_new_best: bool,
/// A block event cause by an initial sync of a node.
is_initial_sync: bool,
},

/// An existing block has been finalized.
Finalized {
/// Hash of just finalized block
Expand Down