From 0c8037d9cc13405b534f5125777bf37e5558be4f Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Mon, 30 Mar 2020 11:18:35 +0200 Subject: [PATCH 1/5] feat/blocks: add a stream for all blocks Includes sync blocks too. --- client/api/src/client.rs | 24 +++++++++++++ client/service/src/builder.rs | 16 ++++----- client/src/client.rs | 48 ++++++++++++++++++++----- client/transaction-pool/src/lib.rs | 4 +++ primitives/transaction-pool/src/pool.rs | 11 ++++++ 5 files changed, 87 insertions(+), 16 deletions(-) diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 06d49da640da6..a7a18a32a08bd 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -36,6 +36,10 @@ pub type ImportNotifications = mpsc::UnboundedReceiver = mpsc::UnboundedReceiver>; + +/// A stream of block import events. +pub type AllBlocksNotifications = mpsc::UnboundedReceiver>; + /// Expected hashes of blocks at given heights. /// /// This may be used as chain spec extension to set trusted checkpoints, i.e. @@ -64,6 +68,10 @@ pub trait BlockchainEvents { /// finalized block. fn finality_notification_stream(&self) -> FinalityNotifications; + /// Get a block import stream of notification. Guaranteed to be fired for every + /// imported block. + fn all_blocks_notification_stream(&self) -> AllBlocksNotifications; + /// Get storage changes event stream. /// /// Passing `None` as `filter_keys` subscribes to all storage changes. @@ -242,3 +250,19 @@ pub struct FinalityNotification { /// Imported block header. pub header: Block::Header, } + + +/// Summary of an imported block +#[derive(Clone, Debug)] +pub struct AllBlocksNotification { + /// Determines if this block notification is part + /// of an initial sync of the node with the network. + /// Imported block header hash. + pub hash: Block::Hash, + /// Imported block origin. + pub origin: BlockOrigin, + /// Imported block header. + pub header: Block::Header, + /// Is this the new best block. + pub is_new_best: bool, +} \ No newline at end of file diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 4363e204c07a8..f218ff9bed540 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -916,12 +916,12 @@ ServiceBuilder< let is_validator = config.roles.is_authority(); let (import_stream, finality_stream) = ( - client.import_notification_stream().map(|n| ChainEvent::NewBlock { - id: BlockId::Hash(n.hash), - header: n.header, - retracted: n.retracted, - is_new_best: n.is_new_best, - }), + client.all_blocks_notification_stream() + .map(|n| ChainEvent::BlockReceived { + header: n.header, + is_initial_sync: n.origin == BlockOrigin::NetworkInitialSync, + is_new_best: n.is_new_best, + }), client.finality_notification_stream().map(|n| ChainEvent::Finalized { hash: n.hash }) @@ -929,10 +929,10 @@ ServiceBuilder< let events = futures::stream::select(import_stream, finality_stream) .for_each(move |event| { // offchain worker is only interested in block import events - if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event { + if let ChainEvent::BlockReceived { ref header, is_new_best, is_initial_sync, .. } = event { let offchain = offchain.as_ref().and_then(|o| o.upgrade()); match offchain { - Some(offchain) if is_new_best => { + Some(offchain) if is_initial_sync || is_new_best => { notifications_spawn_handle.spawn( "offchain-on-block", offchain.on_block_imported( diff --git a/client/src/client.rs b/client/src/client.rs index 5c7fa80291ba9..318437ed1d4e7 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -95,6 +95,7 @@ pub struct Client where Block: BlockT { storage_notifications: Mutex>, import_notification_sinks: Mutex>>>, finality_notification_sinks: Mutex>>>, + all_blocks_notification_sinks: Mutex>>>, // holds the block hash currently being imported. TODO: replace this with block queue importing_block: RwLock>, block_rules: BlockRules, @@ -279,6 +280,7 @@ impl Client where storage_notifications: Default::default(), import_notification_sinks: Default::default(), finality_notification_sinks: Default::default(), + all_blocks_notification_sinks: Default::default(), importing_block: Default::default(), block_rules: BlockRules::new(fork_blocks, bad_blocks), execution_extensions, @@ -742,21 +744,30 @@ impl Client where operation.op.insert_aux(aux)?; + + let header = import_headers.into_post(); + let import_summary = ImportSummary { + hash, + origin, + header, + is_new_best, + storage_changes, + retracted, + }; + + self.notify_any_block_imported( + &import_summary + )?; + if make_notifications { if finalized { operation.notify_finalized.push(hash); } - operation.notify_imported = Some(ImportSummary { - hash, - origin, - header: import_headers.into_post(), - is_new_best, - storage_changes, - retracted, - }) + operation.notify_imported = Some(import_summary); } + Ok(ImportResult::imported(is_new_best)) } @@ -971,6 +982,21 @@ impl Client where Ok(()) } + fn notify_any_block_imported(&self, notify_import: &ImportSummary) -> sp_blockchain::Result<()> { + + let notification = AllBlocksNotification:: { + hash: notify_import.hash.clone(), + origin: notify_import.origin.clone(), + header: notify_import.header.clone(), + is_new_best: notify_import.is_new_best, + }; + + self.all_blocks_notification_sinks.lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + + Ok(()) + } + /// Attempts to revert the chain by `n` blocks guaranteeing that no block is /// reverted past the last finalized block. Returns the number of blocks /// that were successfully reverted. @@ -1775,6 +1801,12 @@ where stream } + fn all_blocks_notification_stream(&self) -> AllBlocksNotifications { + let (sink, stream) = mpsc::unbounded(); + self.all_blocks_notification_sinks.lock().push(sink); + stream + } + /// Get storage changes event stream. fn storage_changes_notification_stream( &self, diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 29d9a4fc1db36..b7df14f6c8c82 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -465,6 +465,10 @@ impl MaintainedTransactionPool for BasicPool } }.boxed() } + ChainEvent::BlockReceived{ .. } => { + // nop, except for initial block sync, NewBlock handles this case + Box::pin(ready(())) + } } } } diff --git a/primitives/transaction-pool/src/pool.rs b/primitives/transaction-pool/src/pool.rs index cad06679647e4..f6b01ccd23636 100644 --- a/primitives/transaction-pool/src/pool.rs +++ b/primitives/transaction-pool/src/pool.rs @@ -256,6 +256,17 @@ pub enum ChainEvent { /// List of retracted blocks ordered by block number. retracted: Vec, }, + + /// A block has been received and is added to the chain + BlockReceived { + /// Header of the just imported block. + header: B::Header, + /// Is this the new best block. + is_new_best: bool, + /// A block event cause by an initial sync of a node. + is_initial_sync: bool, + }, + /// An existing block has been finalized. Finalized { /// Hash of just finalized block From 6ae48d3454029ff0123e5b930734eb55233d7502 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Mon, 30 Mar 2020 15:54:16 +0200 Subject: [PATCH 2/5] Update client/api/src/client.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Tomasz Drwięga --- client/api/src/client.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/api/src/client.rs b/client/api/src/client.rs index a7a18a32a08bd..5372d892e7f9b 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -36,7 +36,6 @@ pub type ImportNotifications = mpsc::UnboundedReceiver = mpsc::UnboundedReceiver>; - /// A stream of block import events. pub type AllBlocksNotifications = mpsc::UnboundedReceiver>; @@ -265,4 +264,4 @@ pub struct AllBlocksNotification { pub header: Block::Header, /// Is this the new best block. pub is_new_best: bool, -} \ No newline at end of file +} From c5e6efdaf3c1c52ccbb2489bbdebfc6f5f3ad83d Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Mon, 30 Mar 2020 15:54:28 +0200 Subject: [PATCH 3/5] Update client/api/src/client.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Tomasz Drwięga --- client/api/src/client.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 5372d892e7f9b..eca7cad7cef50 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -68,7 +68,10 @@ pub trait BlockchainEvents { fn finality_notification_stream(&self) -> FinalityNotifications; /// Get a block import stream of notification. Guaranteed to be fired for every - /// imported block. + /// imported block even when doing a major sync. + /// + /// Use cautiously, cause it may have a significant impact on performance. Note that the events are being + /// sent using unbounded channel, so make sure to process them as soon as possible. fn all_blocks_notification_stream(&self) -> AllBlocksNotifications; /// Get storage changes event stream. From 6edd447e5cf6048359bb6beba61de9094e19ff0e Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Mon, 30 Mar 2020 15:54:37 +0200 Subject: [PATCH 4/5] Update client/service/src/builder.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Tomasz Drwięga --- client/service/src/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index f218ff9bed540..bc945dc42db23 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -932,7 +932,7 @@ ServiceBuilder< if let ChainEvent::BlockReceived { ref header, is_new_best, is_initial_sync, .. } = event { let offchain = offchain.as_ref().and_then(|o| o.upgrade()); match offchain { - Some(offchain) if is_initial_sync || is_new_best => { + Some(offchain) if is_new_best && !is_initial_sync => { notifications_spawn_handle.spawn( "offchain-on-block", offchain.on_block_imported( From 738cfad3198a8f53a9792c91e1094b4f8c332473 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Mon, 30 Mar 2020 15:54:46 +0200 Subject: [PATCH 5/5] Update client/src/client.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Tomasz Drwięga --- client/src/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/client/src/client.rs b/client/src/client.rs index 318437ed1d4e7..2e8b69d67ff69 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -983,7 +983,6 @@ impl Client where } fn notify_any_block_imported(&self, notify_import: &ImportSummary) -> sp_blockchain::Result<()> { - let notification = AllBlocksNotification:: { hash: notify_import.hash.clone(), origin: notify_import.origin.clone(),