Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 3d8a025

Browse files
authored
Support the subscription of every imported block (#13372)
* Support the subscription of every import block Close #13315 * Clean up any closed block import notification sinks * Apply review suggestions * Nit * `every_block_import_notification_sinks` -> `every_import_notification_sinks` * Apply review suggestions
1 parent 9526f04 commit 3d8a025

File tree

4 files changed

+120
-31
lines changed

4 files changed

+120
-31
lines changed

client/api/src/backend.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,19 @@ pub type TransactionForSB<B, Block> = <B as StateBackend<HashFor<Block>>>::Trans
4848
/// Extracts the transaction for the given backend.
4949
pub type TransactionFor<B, Block> = TransactionForSB<StateBackendFor<B, Block>, Block>;
5050

51+
/// Describes which block import notification stream should be notified.
52+
#[derive(Debug, Clone, Copy)]
53+
pub enum ImportNotificationAction {
54+
/// Notify only when the node has synced to the tip or there is a re-org.
55+
RecentBlock,
56+
/// Notify for every single block no matter what the sync state is.
57+
EveryBlock,
58+
/// Both block import notifications above should be fired.
59+
Both,
60+
/// No block import notification should be fired.
61+
None,
62+
}
63+
5164
/// Import operation summary.
5265
///
5366
/// Contains information about the block that just got imported,
@@ -67,6 +80,8 @@ pub struct ImportSummary<Block: BlockT> {
6780
///
6881
/// If `None`, there was no re-org while importing.
6982
pub tree_route: Option<sp_blockchain::TreeRoute<Block>>,
83+
/// What notify action to take for this import.
84+
pub import_notification_action: ImportNotificationAction,
7085
}
7186

7287
/// Finalization operation summary.

client/api/src/client.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,16 @@ pub trait BlockOf {
5959

6060
/// A source of blockchain events.
6161
pub trait BlockchainEvents<Block: BlockT> {
62-
/// Get block import event stream. Not guaranteed to be fired for every
63-
/// imported block.
62+
/// Get block import event stream.
63+
///
64+
/// Not guaranteed to be fired for every imported block, only fired when the node
65+
/// has synced to the tip or there is a re-org. Use `every_import_notification_stream()`
66+
/// if you want a notification of every imported block regardless.
6467
fn import_notification_stream(&self) -> ImportNotifications<Block>;
6568

69+
/// Get a stream of every imported block.
70+
fn every_import_notification_stream(&self) -> ImportNotifications<Block>;
71+
6672
/// Get a stream of finality notifications. Not guaranteed to be fired for every
6773
/// finalized block.
6874
fn finality_notification_stream(&self) -> FinalityNotifications<Block>;

client/merkle-mountain-range/src/test_utils.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ impl BlockchainEvents<Block> for MockClient {
265265
unimplemented!()
266266
}
267267

268+
fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
269+
unimplemented!()
270+
}
271+
268272
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
269273
self.client.lock().finality_notification_stream()
270274
}

client/service/src/client/client.rs

Lines changed: 93 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof};
3131
use sc_client_api::{
3232
backend::{
3333
self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer,
34-
ImportSummary, LockImportRun, NewBlockState, StorageProvider,
34+
ImportNotificationAction, ImportSummary, LockImportRun, NewBlockState, StorageProvider,
3535
},
3636
client::{
3737
BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo,
@@ -106,6 +106,7 @@ where
106106
executor: E,
107107
storage_notifications: StorageNotifications<Block>,
108108
import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
109+
every_import_notification_sinks: NotificationSinks<BlockImportNotification<Block>>,
109110
finality_notification_sinks: NotificationSinks<FinalityNotification<Block>>,
110111
// Collects auxiliary operations to be performed atomically together with
111112
// block import operations.
@@ -304,19 +305,22 @@ where
304305
FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone())
305306
});
306307

307-
let (import_notification, storage_changes) = match notify_imported {
308-
Some(mut summary) => {
309-
let storage_changes = summary.storage_changes.take();
310-
(
311-
Some(BlockImportNotification::from_summary(
312-
summary,
313-
self.unpin_worker_sender.clone(),
314-
)),
315-
storage_changes,
316-
)
317-
},
318-
None => (None, None),
319-
};
308+
let (import_notification, storage_changes, import_notification_action) =
309+
match notify_imported {
310+
Some(mut summary) => {
311+
let import_notification_action = summary.import_notification_action;
312+
let storage_changes = summary.storage_changes.take();
313+
(
314+
Some(BlockImportNotification::from_summary(
315+
summary,
316+
self.unpin_worker_sender.clone(),
317+
)),
318+
storage_changes,
319+
import_notification_action,
320+
)
321+
},
322+
None => (None, None, ImportNotificationAction::None),
323+
};
320324

321325
if let Some(ref notification) = finality_notification {
322326
for action in self.finality_actions.lock().iter_mut() {
@@ -353,7 +357,7 @@ where
353357
}
354358

355359
self.notify_finalized(finality_notification)?;
356-
self.notify_imported(import_notification, storage_changes)?;
360+
self.notify_imported(import_notification, import_notification_action, storage_changes)?;
357361

358362
Ok(r)
359363
};
@@ -451,6 +455,7 @@ where
451455
executor,
452456
storage_notifications: StorageNotifications::new(prometheus_registry),
453457
import_notification_sinks: Default::default(),
458+
every_import_notification_sinks: Default::default(),
454459
finality_notification_sinks: Default::default(),
455460
import_actions: Default::default(),
456461
finality_actions: Default::default(),
@@ -769,11 +774,15 @@ where
769774

770775
operation.op.insert_aux(aux)?;
771776

772-
// We only notify when we are already synced to the tip of the chain
777+
let should_notify_every_block = !self.every_import_notification_sinks.lock().is_empty();
778+
779+
// Notify when we are already synced to the tip of the chain
773780
// or if this import triggers a re-org
774-
if make_notifications || tree_route.is_some() {
781+
let should_notify_recent_block = make_notifications || tree_route.is_some();
782+
783+
if should_notify_every_block || should_notify_recent_block {
775784
let header = import_headers.into_post();
776-
if finalized {
785+
if finalized && should_notify_recent_block {
777786
let mut summary = match operation.notify_finalized.take() {
778787
Some(mut summary) => {
779788
summary.header = header.clone();
@@ -810,13 +819,24 @@ where
810819
operation.notify_finalized = Some(summary);
811820
}
812821

822+
let import_notification_action = if should_notify_every_block {
823+
if should_notify_recent_block {
824+
ImportNotificationAction::Both
825+
} else {
826+
ImportNotificationAction::EveryBlock
827+
}
828+
} else {
829+
ImportNotificationAction::RecentBlock
830+
};
831+
813832
operation.notify_imported = Some(ImportSummary {
814833
hash,
815834
origin,
816835
header,
817836
is_new_best,
818837
storage_changes,
819838
tree_route,
839+
import_notification_action,
820840
})
821841
}
822842

@@ -1012,6 +1032,7 @@ where
10121032
fn notify_imported(
10131033
&self,
10141034
notification: Option<BlockImportNotification<Block>>,
1035+
import_notification_action: ImportNotificationAction,
10151036
storage_changes: Option<(StorageCollection, ChildStorageCollection)>,
10161037
) -> sp_blockchain::Result<()> {
10171038
let notification = match notification {
@@ -1024,22 +1045,59 @@ where
10241045
// temporary leak of closed/discarded notification sinks (e.g.
10251046
// from consensus code).
10261047
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
1048+
1049+
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
1050+
10271051
return Ok(())
10281052
},
10291053
};
10301054

1031-
if let Some(storage_changes) = storage_changes {
1032-
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
1033-
self.storage_notifications.trigger(
1034-
&notification.hash,
1035-
storage_changes.0.into_iter(),
1036-
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
1037-
);
1038-
}
1055+
let trigger_storage_changes_notification = || {
1056+
if let Some(storage_changes) = storage_changes {
1057+
// TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes?
1058+
self.storage_notifications.trigger(
1059+
&notification.hash,
1060+
storage_changes.0.into_iter(),
1061+
storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())),
1062+
);
1063+
}
1064+
};
1065+
1066+
match import_notification_action {
1067+
ImportNotificationAction::Both => {
1068+
trigger_storage_changes_notification();
1069+
self.import_notification_sinks
1070+
.lock()
1071+
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
1072+
1073+
self.every_import_notification_sinks
1074+
.lock()
1075+
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
1076+
},
1077+
ImportNotificationAction::RecentBlock => {
1078+
trigger_storage_changes_notification();
1079+
self.import_notification_sinks
1080+
.lock()
1081+
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
1082+
1083+
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
1084+
},
1085+
ImportNotificationAction::EveryBlock => {
1086+
self.every_import_notification_sinks
1087+
.lock()
1088+
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
1089+
1090+
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
1091+
},
1092+
ImportNotificationAction::None => {
1093+
// This branch is unreachable in fact because the block import notification must be
1094+
// Some(_) instead of None (it's already handled at the beginning of this function)
1095+
// at this point.
1096+
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());
10391097

1040-
self.import_notification_sinks
1041-
.lock()
1042-
.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
1098+
self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed());
1099+
},
1100+
}
10431101

10441102
Ok(())
10451103
}
@@ -1944,6 +2002,12 @@ where
19442002
stream
19452003
}
19462004

2005+
fn every_import_notification_stream(&self) -> ImportNotifications<Block> {
2006+
let (sink, stream) = tracing_unbounded("mpsc_every_import_notification_stream", 100_000);
2007+
self.every_import_notification_sinks.lock().push(sink);
2008+
stream
2009+
}
2010+
19472011
fn finality_notification_stream(&self) -> FinalityNotifications<Block> {
19482012
let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000);
19492013
self.finality_notification_sinks.lock().push(sink);

0 commit comments

Comments
 (0)