-
Notifications
You must be signed in to change notification settings - Fork 2.7k
feat/blocks: add a stream for all blocks #5455
Conversation
Includes sync blocks too.
d9c1e4b to
0c8037d
Compare
tomusdrw
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks OK, but given the use cases we have in mind (Indexing OCW) I believe we should implement something more sophisticated, that gives us more guarantees about execution (state availability + persistence). I've drafted an idea in one of the comments.
| storage_notifications: Mutex<StorageNotifications<Block>>, | ||
| import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<BlockImportNotification<Block>>>>, | ||
| finality_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<FinalityNotification<Block>>>>, | ||
| all_blocks_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<AllBlocksNotification<Block>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm quite hesitant on adding another Unbounded channel here. This may become very unreliable, imagine a case where:
- You have some non-trivial work to do on every block
- The channel queues up a bunch of blocks, as you are not as fast processing them.
- The state get's pruned in the meantime.
The whole scheme is then useless, cause even when we receive the notification the state is already lost. Also if the node is turned off the whole content of the channel is lost, so we will essentially miss running for some blocks.
If we want to make it super reliable and make sure that EACH and EVERY block is indexed/processed we need to come up with a more sophisticated mechanism.
I imagine something like LMAX-Disruptor scheme, where we:
- Persist the state of indexing in some DB
- Pin the state, up until the slower consumer processes it.
- Proceed as fast as possible with fast consumers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm quite hesitant on adding another Unbounded channel here.
Same here.
| operation.op.insert_aux(aux)?; | ||
|
|
||
|
|
||
| let header = import_headers.into_post(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth to construct this lazily for all_blocks. Also I'd consider passing less data to all_blocks notification for performance reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Passing less data is imho not possible since header is required in https://github.com/paritytech/substrate/pull/5455/files#diff-240c17a1e4dbce4ad137b7777bf03ce4R919-R924 which in turn requires header to be extracted from import_headers - correct me if I am wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case there can be many all-blocks-notification-streams one could pass an Arc<AllBlocksNotification> down the channels instead of a clone for each channel within notify_any_block_imported.
We do similar things for the NetworkService::event_stream where we pass a Bytes instead of a Vec<u8>.
|
What's the intended usage for this stream? |
Co-Authored-By: Tomasz Drwięga <[email protected]>
Co-Authored-By: Tomasz Drwięga <[email protected]>
Co-Authored-By: Tomasz Drwięga <[email protected]>
Co-Authored-By: Tomasz Drwięga <[email protected]>
|
Will be deferred to later point of time :) |
Provides a stream that guarantees to trigger on every block, imported, synced.