Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/examples/custom_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl Config for MyConfig {
type Address = <SubstrateConfig as Config>::Address;
type Header = <SubstrateConfig as Config>::Header;
type Signature = <SubstrateConfig as Config>::Signature;
type Extrinsic = <SubstrateConfig as Config>::Extrinsic;
// ExtrinsicParams makes use of the index type, so we need to adjust it
// too to align with our modified index type, above:
type ExtrinsicParams = SubstrateExtrinsicParams<Self>;
Expand Down
2 changes: 1 addition & 1 deletion examples/examples/rpc_call_subscribe_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

// For non-finalised blocks use `.subscribe_blocks()`
let mut blocks: Subscription<Header<u32, BlakeTwo256>> =
api.rpc().subscribe_finalized_blocks().await?;
api.rpc().subscribe_finalized_block_headers().await?;

while let Some(Ok(block)) = blocks.next().await {
println!(
Expand Down
188 changes: 188 additions & 0 deletions subxt/src/blocks/block_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2019-2022 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use crate::{
Config,
error::{
Error,
},
client::{
OfflineClientT,
OnlineClientT,
},
rpc::{
ChainBlockResponse,
},
events
};
use derivative::Derivative;
use sp_runtime::traits::Hash;

/// A representation of a block from which you can obtain details
/// including the block header, extrinsics and events for the block.
pub struct Block<T: Config, C> {
hash: T::Hash,
details: ChainBlockResponse<T>,
client: C,
}

impl <T, C> Block<T, C>
where
T: Config,
C: OfflineClientT<T>
{
pub (crate) fn new(
hash: T::Hash,
details: ChainBlockResponse<T>,
client: C
) -> Self {
Block {
hash, details, client
}
}

/// Return the block hash.
pub fn hash(&self) -> T::Hash {
self.hash
}

/// Return the block header.
pub fn header(&self) -> &T::Header {
&self.details.block.header
}

/// Returns an iterator over the extrinsics in the block.
pub fn extrinsics<'a>(&'a self) -> impl Iterator<Item = Extrinsic<'a, T, C>> {
self.details.block.extrinsics.iter().enumerate().map(|(idx, e)| {
Extrinsic {
index: idx as u32,
bytes: &e.0,
client: self.client.clone(),
block_hash: self.hash,
_marker: std::marker::PhantomData
}
})
}
}

/// A single extrinsic in a block.
pub struct Extrinsic<'a, T: Config, C> {
index: u32,
bytes: &'a [u8],
client: C,
block_hash: T::Hash,
_marker: std::marker::PhantomData<T>
}

impl <'a, T, C> Extrinsic<'a, T, C>
where
T: Config,
C: OfflineClientT<T>
{
/// The index of the extrinsic in the block.
pub fn index(&self) -> u32 {
self.index
}

/// The bytes of the extrinsic.
pub fn bytes(&self) -> &'a [u8] {
&self.bytes
}
}

impl <'a, T, C> Extrinsic<'a, T, C>
where
T: Config,
C: OnlineClientT<T>
{
/// The events associated with the extrinsic.
pub async fn events(&self) -> Result<ExtrinsicEvents<T>, Error> {
let ext_hash = T::Hashing::hash_of(&self.bytes);
let events = events::EventsClient::new(self.client.clone())
.at(Some(self.block_hash))
.await?;

Ok(ExtrinsicEvents::new(ext_hash, self.index, events))
}
}

/// The events associated with a given extrinsic.
#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct ExtrinsicEvents<T: Config> {
// The hash of the extrinsic (handy to expose here because
// this type is returned from TxProgress things in the most
// basic flows, so it's the only place people can access it
// without complicating things for themselves).
ext_hash: T::Hash,
// The index of the extrinsic:
idx: u32,
// All of the events in the block:
events: events::Events<T>
}

impl<T: Config> ExtrinsicEvents<T> {
pub (crate) fn new(
ext_hash: T::Hash,
idx: u32,
events: events::Events<T>
) -> Self {
Self { ext_hash, idx, events }
}

/// Return the hash of the block that the extrinsic is in.
pub fn block_hash(&self) -> T::Hash {
self.events.block_hash()
}

/// Return the hash of the extrinsic.
pub fn extrinsic_hash(&self) -> T::Hash {
self.ext_hash
}

/// Return all of the events in the block that the extrinsic is in.
pub fn all_events_in_block(&self) -> &events::Events<T> {
&self.events
}

/// Iterate over all of the raw events associated with this transaction.
///
/// This works in the same way that [`events::Events::iter()`] does, with the
/// exception that it filters out events not related to the submitted extrinsic.
pub fn iter(&self) -> impl Iterator<Item = Result<events::EventDetails, Error>> + '_ {
self.events.iter().filter(|ev| {
ev.as_ref()
.map(|ev| ev.phase() == events::Phase::ApplyExtrinsic(self.idx))
.unwrap_or(true) // Keep any errors.
})
}

/// Find all of the transaction events matching the event type provided as a generic parameter.
///
/// This works in the same way that [`events::Events::find()`] does, with the
/// exception that it filters out events not related to the submitted extrinsic.
pub fn find<Ev: events::StaticEvent>(&self) -> impl Iterator<Item = Result<Ev, Error>> + '_ {
self.iter().filter_map(|ev| {
ev.and_then(|ev| ev.as_event::<Ev>().map_err(Into::into))
.transpose()
})
}

/// Iterate through the transaction events using metadata to dynamically decode and skip
/// them, and return the first event found which decodes to the provided `Ev` type.
///
/// This works in the same way that [`events::Events::find_first()`] does, with the
/// exception that it ignores events not related to the submitted extrinsic.
pub fn find_first<Ev: events::StaticEvent>(&self) -> Result<Option<Ev>, Error> {
self.find::<Ev>().next().transpose()
}

/// Find an event in those associated with this transaction. Returns true if it was found.
///
/// This works in the same way that [`events::Events::has()`] does, with the
/// exception that it ignores events not related to the submitted extrinsic.
pub fn has<Ev: events::StaticEvent>(&self) -> Result<bool, Error> {
Ok(self.find::<Ev>().next().transpose()?.is_some())
}
}
121 changes: 92 additions & 29 deletions subxt/src/blocks/blocks_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

use crate::{
client::OnlineClientT,
error::Error,
error::{
Error,
BlockError
},
utils::PhantomDataSendSync,
Config,
};
Expand All @@ -17,6 +20,9 @@ use futures::{
};
use sp_runtime::traits::Header;
use std::future::Future;
use super::{
Block
};

/// A client for working with blocks.
#[derive(Derivative)]
Expand All @@ -41,6 +47,72 @@ where
T: Config,
Client: OnlineClientT<T>,
{
/// Obtain block details given the provided block hash, or the latest block if `None` is
/// provided.
pub fn at(
&self,
block_hash: Option<T::Hash>
) -> impl Future<Output = Result<Block<T, Client>, Error>> + Send + 'static {
let client = self.client.clone();
async move {
// If block hash is not provided, get the hash
// for the latest block and use that.
let block_hash = match block_hash {
Some(hash) => hash,
None => {
client
.rpc()
.block_hash(None)
.await?
.expect("didn't pass a block number; qed")
}
};

let res = match client.rpc().block(Some(block_hash)).await? {
Some(block) => block,
None => return Err(BlockError::BlockHashNotFound(hex::encode(block_hash)).into())
};

Ok(Block::new(block_hash, res, client))
}
}

/// Subscribe to finalized blocks.
///
/// This builds upon [`BlocksClient::subscribe_finalized_headers`] and returns details for
/// each block once it is finalized.
pub fn subscribe_finalized(
&self,
) -> impl Future<Output = Result<impl Stream<Item = Result<Block<T, Client>, Error>>, Error>>
+ Send
+ 'static {
let this = self.clone();
async move {
let client = this.client.clone();
let sub = this
.subscribe_finalized_headers()
.await?
.then(move |header| {
let client = client.clone();
async move {
let header = match header {
Ok(header) => header,
Err(e) => return Err(e)
};

let block_hash = header.hash();
let block_details = match client.rpc().block(Some(block_hash)).await? {
Some(block) => block,
None => return Err(BlockError::BlockHashNotFound(hex::encode(block_hash)).into())
};

Ok(Block::new(block_hash, block_details, client))
}
});
Ok(sub)
}
}

/// Subscribe to new best block headers.
///
/// # Note
Expand All @@ -58,47 +130,38 @@ where
+ Send
+ 'static {
let client = self.client.clone();
async move { client.rpc().subscribe_blocks().await }
async move { client.rpc().subscribe_block_headers().await }
}

/// Subscribe to finalized block headers.
///
/// While the Substrate RPC method does not guarantee that all finalized block headers are
/// provided, this function does.
/// ```
pub fn subscribe_finalized_headers(
&self,
) -> impl Future<Output = Result<impl Stream<Item = Result<T::Header, Error>>, Error>>
+ Send
+ 'static {
let client = self.client.clone();
async move { subscribe_finalized_headers(client).await }
}
}
async move {
// Fetch the last finalised block details immediately, so that we'll get
// all blocks after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_num = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());

async fn subscribe_finalized_headers<T, Client>(
client: Client,
) -> Result<impl Stream<Item = Result<T::Header, Error>>, Error>
where
T: Config,
Client: OnlineClientT<T>,
{
// Fetch the last finalised block details immediately, so that we'll get
// all blocks after this one.
let last_finalized_block_hash = client.rpc().finalized_head().await?;
let last_finalized_block_num = client
.rpc()
.header(Some(last_finalized_block_hash))
.await?
.map(|h| (*h.number()).into());

let sub = client.rpc().subscribe_finalized_blocks().await?;

// Adjust the subscription stream to fill in any missing blocks.
Ok(
subscribe_to_block_headers_filling_in_gaps(client, last_finalized_block_num, sub)
.boxed(),
)
let sub = client.rpc().subscribe_finalized_block_headers().await?;

// Adjust the subscription stream to fill in any missing blocks.
Ok(
subscribe_to_block_headers_filling_in_gaps(client, last_finalized_block_num, sub)
.boxed(),
)
}
}
}

/// Note: This is exposed for testing but is not considered stable and may change
Expand Down
6 changes: 6 additions & 0 deletions subxt/src/blocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
//! This module exposes the necessary functionality for working with events.

mod blocks_client;
mod block_types;

pub use blocks_client::{
subscribe_to_block_headers_filling_in_gaps,
BlocksClient,
};
pub use block_types::{
Block,
Extrinsic,
ExtrinsicEvents,
};
Loading