Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion core/rpc/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rpc::futures::{stream, Future, Sink, Stream};
use runtime_primitives::generic::{BlockId, SignedBlock};
use runtime_primitives::traits::{Block as BlockT, Header, NumberFor};
use runtime_version::RuntimeVersion;
use primitives::{Blake2Hasher};
use primitives::{Blake2Hasher, storage};

use subscriptions::Subscriptions;

Expand Down Expand Up @@ -68,6 +68,16 @@ build_rpc_trait! {
#[rpc(name = "chain_unsubscribeNewHead", alias = ["unsubscribe_newHead", ])]
fn unsubscribe_new_head(&self, SubscriptionId) -> RpcResult<bool>;
}

#[pubsub(name = "chain_runtimeVersion")] {
/// New runtime version subscription
#[rpc(name = "chain_subscribeRuntimeVersion")]
fn subscribe_runtime_version(&self, Self::Metadata, pubsub::Subscriber<RuntimeVersion>);

/// Unsubscribe from runtime version subscription
#[rpc(name = "chain_unsubscribeRuntimeVersion")]
fn unsubscribe_runtime_version(&self, SubscriptionId) -> RpcResult<bool>;
}
}
}

Expand Down Expand Up @@ -163,4 +173,53 @@ impl<B, E, Block> ChainApi<Block::Hash, Block::Header, NumberFor<Block>, Block::
fn unsubscribe_new_head(&self, id: SubscriptionId) -> RpcResult<bool> {
Ok(self.subscriptions.cancel(id))
}


fn subscribe_runtime_version(&self, _meta: Self::Metadata, subscriber: pubsub::Subscriber<RuntimeVersion>) {
let stream = match self.client.storage_changes_notification_stream(Some(&[storage::StorageKey(storage::well_known_keys::CODE.to_vec())])) {
Ok(stream) => stream,
Err(err) => {
let _ = subscriber.reject(error::Error::from(err).into());
return;
}
};

self.subscriptions.add(subscriber, |sink| {
let version = self.runtime_version(None.into())
.map_err(Into::into);

let client = self.client.clone();
let mut previous_version = version.clone();

let stream = stream
.map_err(|e| warn!("Error creating storage notification stream: {:?}", e))
.filter_map(move |_| {
let version = client.info().and_then(|info| {
client.runtime_version_at(&BlockId::hash(info.chain.best_hash))
})
.map_err(error::Error::from)
.map_err(Into::into);
if previous_version != version {
previous_version = version.clone();
Some(version)
} else {
None
}
});

sink
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
.send_all(
stream::iter_result(vec![Ok(version)])
.chain(stream)
)
// we ignore the resulting Stream (if the first stream is over we are unsubscribed)
.map(|_| ())
});
}


fn unsubscribe_runtime_version(&self, id: SubscriptionId) -> RpcResult<bool> {
Ok(self.subscriptions.cancel(id))
}
}