Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c8b0907
PoC async rpc client
niklasad1 Nov 30, 2021
a45bf14
add client example should be removed from this repo
niklasad1 Nov 30, 2021
c2a9c75
fmt
niklasad1 Nov 30, 2021
1fb8c8d
cargo fmt
niklasad1 Nov 30, 2021
212d79f
subxt client tests
niklasad1 Nov 30, 2021
1a3b5a9
cargo fmt
niklasad1 Nov 30, 2021
f9860e7
fix some nits
niklasad1 Dec 1, 2021
f21ee1a
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-core-c…
niklasad1 Dec 1, 2021
a59db8d
try nightly for all CI jobs
jsdw Dec 2, 2021
c5b3104
need wasm also for CI
jsdw Dec 2, 2021
de869e8
wasm for nightly run too
jsdw Dec 2, 2021
fedf6bf
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-core-c…
niklasad1 Dec 3, 2021
8e6b277
client: add missing features
niklasad1 Dec 3, 2021
752c19e
update jsonrpsee
niklasad1 Dec 21, 2021
2bfc389
hacky
niklasad1 Dec 21, 2021
bca1473
hacky update jsonrpsee
niklasad1 Dec 22, 2021
e2a93e3
use jsonrpsee crates.io release
niklasad1 Dec 22, 2021
32555aa
ci: pin nightly 2021-12-15
niklasad1 Jan 4, 2022
8d449c3
pin nightly to 2021-12-15
niklasad1 Jan 4, 2022
bb10358
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-core-c…
niklasad1 Jan 4, 2022
937e9a6
fix build
niklasad1 Jan 4, 2022
1551e70
fmt
niklasad1 Jan 4, 2022
65fb71a
compile please
niklasad1 Jan 4, 2022
c988c4a
rewrite me
niklasad1 Jan 20, 2022
8dc7dda
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-core-c…
niklasad1 Feb 2, 2022
ae24d3b
fixes
niklasad1 Feb 2, 2022
fa3b905
fixes
niklasad1 Feb 2, 2022
f603d68
pre-generate metadata
niklasad1 Feb 3, 2022
d433828
Merge remote-tracking branch 'origin/na-jsonrpsee-core-client' into n…
niklasad1 Feb 3, 2022
a5df40a
fix nit
niklasad1 Feb 3, 2022
a09e8a8
get rid of needless deps
niklasad1 Feb 3, 2022
b53ae9f
remove embedded client
niklasad1 Feb 3, 2022
b9bc61f
Update Cargo.toml
niklasad1 Feb 3, 2022
b21fde8
Update subxt/Cargo.toml
niklasad1 Feb 3, 2022
4053e30
Update subxt/Cargo.toml
niklasad1 Feb 3, 2022
eed8238
Update subxt/src/client.rs
niklasad1 Feb 3, 2022
5182b95
Update subxt/src/rpc.rs
niklasad1 Feb 3, 2022
9887b39
Update test-runtime/build.rs
niklasad1 Feb 3, 2022
73623c1
cargo fmt
niklasad1 Feb 3, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
hacky
  • Loading branch information
niklasad1 committed Dec 21, 2021
commit 2bfc3898b75eade39d8d7ea7b848c749ec49f033
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ chameleon = "0.1.0"
scale-info = { version = "1.0.0", features = ["bit-vec"] }
futures = "0.3.13"
hex = "0.4.3"
jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee/", branch = "master", features = ["macros", "core-client", "client", "client-ws-transport"] }
jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee/", branch = "jsonrpsee-clients-expose-tls-feature", features = ["macros", "core-client", "client", "client-ws-transport"] }
log = "0.4.14"
num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.124", features = ["derive"] }
Expand Down
9 changes: 4 additions & 5 deletions examples/submit_and_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn simple_transfer() -> Result<(), Box<dyn std::error::Error>> {
.sign_and_submit_then_watch(&signer)
.await?
.wait_for_finalized_success()
.await?;
.await.unwrap()?;

let transfer_event =
balance_transfer.find_first_event::<polkadot::balances::events::Transfer>()?;
Expand Down Expand Up @@ -93,7 +93,7 @@ async fn simple_transfer_separate_events() -> Result<(), Box<dyn std::error::Err
.sign_and_submit_then_watch(&signer)
.await?
.wait_for_finalized()
.await?;
.await.unwrap()?;

// Now we know it's been finalized, we can get hold of a couple of
// details, including events. Calling `wait_for_finalized_success` is
Expand Down Expand Up @@ -144,8 +144,9 @@ async fn handle_transfer_events() -> Result<(), Box<dyn std::error::Error>> {
.sign_and_submit_then_watch(&signer)
.await?;

while let Some(ev) = balance_transfer_progress.next().await? {
loop {
use subxt::TransactionStatus::*;
let ev = balance_transfer_progress.next().await.unwrap()?;

// Made it into a block, but not finalized.
if let InBlock(details) = ev {
Expand Down Expand Up @@ -191,6 +192,4 @@ async fn handle_transfer_events() -> Result<(), Box<dyn std::error::Error>> {
println!("Current transaction status: {:?}", ev);
}
}

Ok(())
}
125 changes: 0 additions & 125 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,99 +451,6 @@ impl<T: Config> Rpc<T> {
Ok(subscription)
}

/// Create and submit an extrinsic and return corresponding Event if successful
pub async fn submit_and_watch_extrinsic<'a, E: Encode + 'static>(
&self,
extrinsic: E,
decoder: &'a EventsDecoder<T>,
) -> Result<ExtrinsicSuccess<T>, Error> {
let ext_hash = T::Hashing::hash_of(&extrinsic);
log::info!("Submitting Extrinsic `{:?}`", ext_hash);

let events_sub = if self.accept_weak_inclusion {
self.subscribe_events().await
} else {
self.subscribe_finalized_events().await
}?;
let mut xt_sub = self.watch_extrinsic(extrinsic).await?;

while let Some(status) = xt_sub.next().await {
log::info!("Received status {:?}", status);
let status = status?;
match status {
TransactionStatus::Future
| TransactionStatus::Ready
| TransactionStatus::Broadcast(_)
| TransactionStatus::Retracted(_) => continue,
TransactionStatus::InBlock(block_hash) => {
if self.accept_weak_inclusion {
return self
.process_block(events_sub, decoder, block_hash, ext_hash)
.await
}
continue
}
TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()),
TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()),
TransactionStatus::Dropped => return Err("Extrinsic Dropped".into()),
TransactionStatus::Finalized(block_hash) => {
// read finalized blocks by default
return self
.process_block(events_sub, decoder, block_hash, ext_hash)
.await
}
TransactionStatus::FinalityTimeout(_) => {
return Err("Extrinsic FinalityTimeout".into())
}
}
}
Err(RpcError::Custom("RPC subscription dropped".into()).into())
}

async fn process_block(
&self,
events_sub: EventStorageSubscription<T>,
decoder: &EventsDecoder<T>,
block_hash: T::Hash,
ext_hash: T::Hash,
) -> Result<ExtrinsicSuccess<T>, Error> {
log::info!("Fetching block {:?}", block_hash);
if let Some(signed_block) = self.block(Some(block_hash)).await? {
log::info!(
"Found block {:?}, with {} extrinsics",
block_hash,
signed_block.block.extrinsics.len()
);
let ext_index = signed_block
.block
.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == ext_hash
})
.ok_or_else(|| {
Error::Other(format!(
"Failed to find Extrinsic with hash {:?}",
ext_hash,
))
})?;
let mut sub = EventSubscription::new(events_sub, decoder);
sub.filter_extrinsic(block_hash, ext_index);
let mut events = vec![];
while let Some(event) = sub.next().await {
events.push(event?);
}
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
} else {
Err(format!("Failed to find block {:?}", block_hash).into())
}
}

/// Insert a key into the keystore.
pub async fn insert_key(
&self,
Expand Down Expand Up @@ -587,38 +494,6 @@ impl<T: Config> Rpc<T> {
}
}

/// Captures data for when an extrinsic is successfully included in a block
#[derive(Debug)]
pub struct ExtrinsicSuccess<T: Config> {
/// Block hash.
pub block: T::Hash,
/// Extrinsic hash.
pub extrinsic: T::Hash,
/// Raw runtime events, can be decoded by the caller.
pub events: Vec<RawEvent>,
}

impl<T: Config> ExtrinsicSuccess<T> {
/// Find the Event for the given module/variant, with raw encoded event data.
/// Returns `None` if the Event is not found.
pub fn find_event_raw(&self, module: &str, variant: &str) -> Option<&RawEvent> {
self.events
.iter()
.find(|raw| raw.pallet == module && raw.variant == variant)
}

/// Find the Event for the given module/variant, attempting to decode the event data.
/// Returns `None` if the Event is not found.
/// Returns `Err` if the data fails to decode into the supplied type.
pub fn find_event<E: Event>(&self) -> Result<Option<E>, CodecError> {
if let Some(event) = self.find_event_raw(E::PALLET, E::EVENT) {
Ok(Some(E::decode(&mut &event.data[..])?))
} else {
Ok(None)
}
}
}

/// Build WS RPC client from URL
pub async fn build_ws_client(url: &str) -> Result<RpcClient, RpcError> {
let (sender, receiver) = ws_transport(url).await?;
Expand Down
75 changes: 40 additions & 35 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use crate::{
Config,
Phase,
};
use jsonrpsee::types::{
Error as RpcError,
Subscription as RpcSubscription,
};
use jsonrpsee::core::client::Subscription as RpcSubscription;

/// This struct represents a subscription to the progress of some transaction, and is
/// returned from [`crate::SubmittableExtrinsic::sign_and_submit_then_watch()`].
Expand All @@ -58,16 +55,16 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
}

/// Return the next transaction status when it's emitted.
pub async fn next(&mut self) -> Result<Option<TransactionStatus<'client, T>>, Error> {
pub async fn next(&mut self) -> Option<Result<TransactionStatus<'client, T>, Error>> {
// Return `None` if the subscription has been dropped:
let sub = match &mut self.sub {
Some(sub) => sub,
None => return Ok(None),
None => return None,
};

// Return the next item otherwise:
let res = sub.next().await?;
Ok(res.map(|status| {
Some(res.map(|status| {
match status {
SubstrateTransactionStatus::Future => TransactionStatus::Future,
SubstrateTransactionStatus::Ready => TransactionStatus::Ready,
Expand Down Expand Up @@ -111,7 +108,7 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
})
}
}
}))
}).map_err(Into::into))
}

/// Wait for the transaction to be in a block (but not necessarily finalized), and return
Expand All @@ -127,22 +124,24 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
/// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_in_block(
mut self,
) -> Result<TransactionInBlock<'client, T>, Error> {
while let Some(status) = self.next().await? {
match status {
// Finalized or otherwise in a block! Return.
TransactionStatus::InBlock(s) | TransactionStatus::Finalized(s) => {
return Ok(s)
}
// Error scenarios; return the error.
TransactionStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
) -> Option<Result<TransactionInBlock<'client, T>, Error>> {
loop {
match self.next().await? {
Ok(status) => match status {
// Finalized or otherwise in a block! Return.
TransactionStatus::InBlock(s) | TransactionStatus::Finalized(s) => {
return Some(Ok(s))
}
// Error scenarios; return the error.
TransactionStatus::FinalityTimeout(_) => {
return Some(Err(TransactionError::FinalitySubscriptionTimeout.into()))
}
// Ignore anything else and wait for next status event:
_ => continue,
}
// Ignore anything else and wait for next status event:
_ => continue,
Err(err) => return Some(Err(err)),
}
}
Err(RpcError::Custom("RPC subscription dropped".into()).into())
}

/// Wait for the transaction to be finalized, and return a [`TransactionInBlock`]
Expand All @@ -157,20 +156,22 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
/// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_finalized(
mut self,
) -> Result<TransactionInBlock<'client, T>, Error> {
while let Some(status) = self.next().await? {
match status {
// Finalized! Return.
TransactionStatus::Finalized(s) => return Ok(s),
// Error scenarios; return the error.
TransactionStatus::FinalityTimeout(_) => {
return Err(TransactionError::FinalitySubscriptionTimeout.into())
) -> Option<Result<TransactionInBlock<'client, T>, Error>> {
loop {
match self.next().await? {
Ok(status) => match status {
// finalized! return.
TransactionStatus::Finalized(s) => return Some(Ok(s)),
// error scenarios; return the error.
TransactionStatus::FinalityTimeout(_) => {
return Some(Err(TransactionError::FinalitySubscriptionTimeout.into()))
}
// ignore and wait for next status event:
_ => continue,
}
// Ignore and wait for next status event:
_ => continue,
Err(err) => return Some(Err(err)),
}
}
Err(RpcError::Custom("RPC subscription dropped".into()).into())
}

/// Wait for the transaction to be finalized, and for the transaction events to indicate
Expand All @@ -184,9 +185,13 @@ impl<'client, T: Config> TransactionProgress<'client, T> {
/// may well indicate with some probability that the transaction will not make it into a block,
/// there is no guarantee that this is true. Thus, we prefer to "play it safe" here. Use the lower
/// level [`TransactionProgress::next()`] API if you'd like to handle these statuses yourself.
pub async fn wait_for_finalized_success(self) -> Result<TransactionEvents<T>, Error> {
let evs = self.wait_for_finalized().await?.wait_for_success().await?;
Ok(evs)
pub async fn wait_for_finalized_success(self) -> Option<Result<TransactionEvents<T>, Error>> {
let finalized = match self.wait_for_finalized().await? {
Ok(f) => f,
Err(err) => return Some(Err(err)),
};

Some(finalized.wait_for_success().await)
}
}

Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.