diff --git a/client/rpc/src/chain/chain_full.rs b/client/rpc/src/chain/chain_full.rs index 4d6dd9a46446e..6c5e8c0827586 100644 --- a/client/rpc/src/chain/chain_full.rs +++ b/client/rpc/src/chain/chain_full.rs @@ -19,10 +19,14 @@ //! Blockchain API backend for full nodes. use super::{client_err, ChainBackend, Error}; -use crate::{chain::helpers, SubscriptionTaskExecutor}; +use crate::SubscriptionTaskExecutor; use std::{marker::PhantomData, sync::Arc}; -use futures::task::Spawn; +use futures::{ + stream::{self, Stream, StreamExt}, + future, + task::Spawn, +}; use jsonrpsee::ws_server::SubscriptionSink; use sc_client_api::{BlockBackend, BlockchainEvents}; use sp_blockchain::HeaderBackend; @@ -68,27 +72,104 @@ where } fn subscribe_all_heads(&self, sink: SubscriptionSink) -> Result<(), Error> { - let client = self.client.clone(); - let executor = self.executor.clone(); - - let fut = helpers::subscribe_headers(client, sink, "chain_subscribeAllHeads"); - executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e))) + subscribe_headers( + &self.client, + self.executor.clone(), + "chain_subscribeAllHeads", + sink, + || self.client().info().best_hash, + || { + self.client() + .import_notification_stream() + .map(|notification| notification.header) + }, + ) } fn subscribe_new_heads(&self, sink: SubscriptionSink) -> Result<(), Error> { - let client = self.client.clone(); - let executor = self.executor.clone(); - - let fut = helpers::subscribe_headers(client, sink, "chain_subscribeNewHeads"); - executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e))) + subscribe_headers( + &self.client, + self.executor.clone(), + "chain_subscribeNewHeads", + sink, + || self.client().info().best_hash, + || { + self.client() + .import_notification_stream() + .filter(|notification| future::ready(notification.is_new_best)) + .map(|notification| notification.header) + }, + ) } fn subscribe_finalized_heads(&self, sink: SubscriptionSink) -> Result<(), Error> { - let client = self.client.clone(); - let executor = self.executor.clone(); + subscribe_headers( + &self.client, + self.executor.clone(), + "chain_subscribeFinalizedHeads", + sink, + || self.client().info().finalized_hash, + || { + self.client() + .finality_notification_stream() + .map(|notification| notification.header) + }, + ) + } +} + +/// Subscribe to new headers. +fn subscribe_headers( + client: &Arc, + executor: SubscriptionTaskExecutor, + method: &'static str, + mut sink: SubscriptionSink, + best_block_hash: G, + stream: F, +) -> Result<(), Error> +where + Block: BlockT + 'static, + Block::Header: Unpin, + Client: HeaderBackend + 'static, + F: FnOnce() -> S, + G: FnOnce() -> Block::Hash, + S: Stream + Send + 'static, +{ + // send current head right at the start. + let maybe_header = client + .header(BlockId::Hash(best_block_hash())) + .map_err(client_err) + .and_then(|header| { + header.ok_or_else(|| Error::Other("Best header missing.".to_string())) + }) + .map_err(|e| { + log::warn!("Best header error {:?}", e); + e + }) + .ok(); + + // send further subscriptions + let stream = stream(); + + // NOTE: by the time we set up the stream there might be a new best block and so there is a risk + // that the stream has a hole in it. The alternative would be to look up the best block *after* + // we set up the stream and chain it to the stream. Consuming code would need to handle + // duplicates at the beginning of the stream though. + let fut = async move { + stream::iter(maybe_header) + .chain(stream) + .take_while(|storage| { + future::ready(sink.send(&storage).map_or_else( + |e| { + log::debug!("Could not send data to subscription: {} error: {:?}", method, e); + false + }, + |_| true, + )) + }) + .for_each(|_| future::ready(())) + .await; + }; - let fut = - helpers::subscribe_finalized_headers(client, sink, "chain_subscribeFinalizedHeads"); executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e))) - } } diff --git a/client/rpc/src/chain/helpers.rs b/client/rpc/src/chain/helpers.rs deleted file mode 100644 index 385947423552c..0000000000000 --- a/client/rpc/src/chain/helpers.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::sync::Arc; - -use futures::{future, StreamExt}; -use jsonrpsee::ws_server::SubscriptionSink; -use sc_client_api::BlockchainEvents; -use sp_blockchain::HeaderBackend; -use sp_runtime::{generic::BlockId, traits::Block as BlockT}; - -/// Helper to create subscriptions for `allHeads` and `newHeads`. -pub async fn subscribe_headers( - client: Arc, - mut sink: SubscriptionSink, - method: &str, -) where - Block: BlockT + 'static, - Client: HeaderBackend + BlockchainEvents + 'static, -{ - let hash = client.info().best_hash; - let best_head = match client.header(BlockId::Hash(hash)) { - Ok(head) => head, - Err(e) => { - log_err(method, e); - return - }, - }; - - if let Err(e) = sink.send(&best_head) { - log_err(method, e); - return - }; - - // NOTE: by the time we set up the stream there might be a new best block and so there is a risk - // that the stream has a hole in it. The alternative would be to look up the best block *after* - // we set up the stream and chain it to the stream. Consuming code would need to handle - // duplicates at the beginning of the stream though. - let stream = client.import_notification_stream(); - stream - .take_while(|import| { - future::ready(sink.send(&import.header).map_or_else( - |e| { - log_err(method, e); - false - }, - |_| true, - )) - }) - .for_each(|_| future::ready(())) - .await; -} - -/// Helper to create subscriptions for `finalizedHeads`. -pub async fn subscribe_finalized_headers( - client: Arc, - mut sink: SubscriptionSink, - method: &str, -) where - Block: BlockT + 'static, - Client: HeaderBackend + BlockchainEvents + 'static, -{ - let hash = client.info().finalized_hash; - let best_head = match client.header(BlockId::Hash(hash)) { - Ok(head) => head, - Err(err) => { - log_err(method, err); - return - }, - }; - - if let Err(err) = sink.send(&best_head) { - log_err(method, err); - return - }; - - // NOTE: by the time we set up the stream there might be a new best block and so there is a risk - // that the stream has a hole in it. The alternative would be to look up the best block *after* - // we set up the stream and chain it to the stream. Consuming code would need to handle - // duplicates at the beginning of the stream though. - let stream = client.finality_notification_stream(); - stream - .take_while(|import| { - future::ready(sink.send(&import.header).map_or_else( - |e| { - log_err(method, e); - false - }, - |_| true, - )) - }) - .for_each(|_| future::ready(())) - .await; -} - -fn log_err(method: &str, err: E) { - log::debug!("Could not send data to subscription: {} error: {:?}", method, err); -} diff --git a/client/rpc/src/chain/mod.rs b/client/rpc/src/chain/mod.rs index 11636283d46f2..bea26a83f424c 100644 --- a/client/rpc/src/chain/mod.rs +++ b/client/rpc/src/chain/mod.rs @@ -19,7 +19,6 @@ //! Substrate blockchain API. mod chain_full; -mod helpers; #[cfg(test)] mod tests;