diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 06d49da640da6..eca7cad7cef50 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -36,6 +36,9 @@ pub type ImportNotifications = mpsc::UnboundedReceiver = mpsc::UnboundedReceiver>; +/// A stream of block import events. +pub type AllBlocksNotifications = mpsc::UnboundedReceiver>; + /// Expected hashes of blocks at given heights. /// /// This may be used as chain spec extension to set trusted checkpoints, i.e. @@ -64,6 +67,13 @@ pub trait BlockchainEvents { /// finalized block. fn finality_notification_stream(&self) -> FinalityNotifications; + /// Get a block import stream of notification. Guaranteed to be fired for every + /// imported block even when doing a major sync. + /// + /// Use cautiously, cause it may have a significant impact on performance. Note that the events are being + /// sent using unbounded channel, so make sure to process them as soon as possible. + fn all_blocks_notification_stream(&self) -> AllBlocksNotifications; + /// Get storage changes event stream. /// /// Passing `None` as `filter_keys` subscribes to all storage changes. @@ -242,3 +252,19 @@ pub struct FinalityNotification { /// Imported block header. pub header: Block::Header, } + + +/// Summary of an imported block +#[derive(Clone, Debug)] +pub struct AllBlocksNotification { + /// Determines if this block notification is part + /// of an initial sync of the node with the network. + /// Imported block header hash. + pub hash: Block::Hash, + /// Imported block origin. + pub origin: BlockOrigin, + /// Imported block header. + pub header: Block::Header, + /// Is this the new best block. + pub is_new_best: bool, +} diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 4363e204c07a8..bc945dc42db23 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -916,12 +916,12 @@ ServiceBuilder< let is_validator = config.roles.is_authority(); let (import_stream, finality_stream) = ( - client.import_notification_stream().map(|n| ChainEvent::NewBlock { - id: BlockId::Hash(n.hash), - header: n.header, - retracted: n.retracted, - is_new_best: n.is_new_best, - }), + client.all_blocks_notification_stream() + .map(|n| ChainEvent::BlockReceived { + header: n.header, + is_initial_sync: n.origin == BlockOrigin::NetworkInitialSync, + is_new_best: n.is_new_best, + }), client.finality_notification_stream().map(|n| ChainEvent::Finalized { hash: n.hash }) @@ -929,10 +929,10 @@ ServiceBuilder< let events = futures::stream::select(import_stream, finality_stream) .for_each(move |event| { // offchain worker is only interested in block import events - if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event { + if let ChainEvent::BlockReceived { ref header, is_new_best, is_initial_sync, .. } = event { let offchain = offchain.as_ref().and_then(|o| o.upgrade()); match offchain { - Some(offchain) if is_new_best => { + Some(offchain) if is_new_best && !is_initial_sync => { notifications_spawn_handle.spawn( "offchain-on-block", offchain.on_block_imported( diff --git a/client/src/client.rs b/client/src/client.rs index 5c7fa80291ba9..2e8b69d67ff69 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -95,6 +95,7 @@ pub struct Client where Block: BlockT { storage_notifications: Mutex>, import_notification_sinks: Mutex>>>, finality_notification_sinks: Mutex>>>, + all_blocks_notification_sinks: Mutex>>>, // holds the block hash currently being imported. TODO: replace this with block queue importing_block: RwLock>, block_rules: BlockRules, @@ -279,6 +280,7 @@ impl Client where storage_notifications: Default::default(), import_notification_sinks: Default::default(), finality_notification_sinks: Default::default(), + all_blocks_notification_sinks: Default::default(), importing_block: Default::default(), block_rules: BlockRules::new(fork_blocks, bad_blocks), execution_extensions, @@ -742,21 +744,30 @@ impl Client where operation.op.insert_aux(aux)?; + + let header = import_headers.into_post(); + let import_summary = ImportSummary { + hash, + origin, + header, + is_new_best, + storage_changes, + retracted, + }; + + self.notify_any_block_imported( + &import_summary + )?; + if make_notifications { if finalized { operation.notify_finalized.push(hash); } - operation.notify_imported = Some(ImportSummary { - hash, - origin, - header: import_headers.into_post(), - is_new_best, - storage_changes, - retracted, - }) + operation.notify_imported = Some(import_summary); } + Ok(ImportResult::imported(is_new_best)) } @@ -971,6 +982,20 @@ impl Client where Ok(()) } + fn notify_any_block_imported(&self, notify_import: &ImportSummary) -> sp_blockchain::Result<()> { + let notification = AllBlocksNotification:: { + hash: notify_import.hash.clone(), + origin: notify_import.origin.clone(), + header: notify_import.header.clone(), + is_new_best: notify_import.is_new_best, + }; + + self.all_blocks_notification_sinks.lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + + Ok(()) + } + /// Attempts to revert the chain by `n` blocks guaranteeing that no block is /// reverted past the last finalized block. Returns the number of blocks /// that were successfully reverted. @@ -1775,6 +1800,12 @@ where stream } + fn all_blocks_notification_stream(&self) -> AllBlocksNotifications { + let (sink, stream) = mpsc::unbounded(); + self.all_blocks_notification_sinks.lock().push(sink); + stream + } + /// Get storage changes event stream. fn storage_changes_notification_stream( &self, diff --git a/client/transaction-pool/src/lib.rs b/client/transaction-pool/src/lib.rs index 29d9a4fc1db36..b7df14f6c8c82 100644 --- a/client/transaction-pool/src/lib.rs +++ b/client/transaction-pool/src/lib.rs @@ -465,6 +465,10 @@ impl MaintainedTransactionPool for BasicPool } }.boxed() } + ChainEvent::BlockReceived{ .. } => { + // nop, except for initial block sync, NewBlock handles this case + Box::pin(ready(())) + } } } } diff --git a/primitives/transaction-pool/src/pool.rs b/primitives/transaction-pool/src/pool.rs index cad06679647e4..f6b01ccd23636 100644 --- a/primitives/transaction-pool/src/pool.rs +++ b/primitives/transaction-pool/src/pool.rs @@ -256,6 +256,17 @@ pub enum ChainEvent { /// List of retracted blocks ordered by block number. retracted: Vec, }, + + /// A block has been received and is added to the chain + BlockReceived { + /// Header of the just imported block. + header: B::Header, + /// Is this the new best block. + is_new_best: bool, + /// A block event cause by an initial sync of a node. + is_initial_sync: bool, + }, + /// An existing block has been finalized. Finalized { /// Hash of just finalized block