Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core-client/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ipc = [
arbitrary_precision = ["serde_json/arbitrary_precision", "jsonrpc-core/arbitrary_precision"]

[dependencies]
failure = "0.1"
derive_more = "0.99"
futures = { version = "0.3", features = [ "compat" ] }
jsonrpc-core = { version = "15.0", path = "../../core" }
jsonrpc-pubsub = { version = "15.0", path = "../../pubsub" }
Expand Down
59 changes: 36 additions & 23 deletions core-client/transports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#![deny(missing_docs)]

use failure::{format_err, Fail};
use jsonrpc_core::futures::channel::{mpsc, oneshot};
use jsonrpc_core::futures::{
self,
Expand All @@ -22,20 +21,34 @@ pub mod transports;
mod logger;

/// The errors returned by the client.
#[derive(Debug, Fail)]
#[derive(Debug, derive_more::Display)]
pub enum RpcError {
/// An error returned by the server.
#[fail(display = "Server returned rpc error {}", _0)]
#[display(fmt = "Server returned rpc error {}", _0)]
JsonRpcError(Error),
/// Failure to parse server response.
#[fail(display = "Failed to parse server response as {}: {}", _0, _1)]
ParseError(String, failure::Error),
#[display(fmt = "Failed to parse server response as {}: {}", _0, _1)]
ParseError(String, Box<dyn std::error::Error + Send>),
/// Request timed out.
#[fail(display = "Request timed out")]
#[display(fmt = "Request timed out")]
Timeout,
/// A general client error.
#[display(fmt = "Client error: {}", _0)]
Client(String),
/// Not rpc specific errors.
#[fail(display = "{}", _0)]
Other(failure::Error),
#[display(fmt = "{}", _0)]
Other(Box<dyn std::error::Error + Send>),
}

impl std::error::Error for RpcError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match *self {
Self::JsonRpcError(ref e) => Some(e),
Self::ParseError(_, ref e) => Some(&**e),
Self::Other(ref e) => Some(&**e),
_ => None,
}
}
}

impl From<Error> for RpcError {
Expand Down Expand Up @@ -162,7 +175,7 @@ impl<T: DeserializeOwned + Unpin + 'static> Stream for TypedSubscriptionStream<T
match result {
Some(Ok(value)) => Some(
serde_json::from_value::<T>(value)
.map_err(|error| RpcError::ParseError(self.returns.into(), error.into())),
.map_err(|error| RpcError::ParseError(self.returns.into(), Box::new(error))),
),
None => None,
Some(Err(err)) => Some(Err(err.into())),
Expand Down Expand Up @@ -192,9 +205,9 @@ impl RawClient {
};
let result = self.0.send(msg.into());
async move {
let () = result.map_err(|e| RpcError::Other(e.into()))?;
let () = result.map_err(|e| RpcError::Other(Box::new(e)))?;

receiver.await.map_err(|e| RpcError::Other(e.into()))?
receiver.await.map_err(|e| RpcError::Other(Box::new(e)))?
}
}

Expand All @@ -206,7 +219,7 @@ impl RawClient {
};
match self.0.send(msg.into()) {
Ok(()) => Ok(()),
Err(error) => Err(RpcError::Other(error.into())),
Err(error) => Err(RpcError::Other(Box::new(error))),
}
}

Expand All @@ -232,7 +245,7 @@ impl RawClient {
self.0
.send(msg.into())
.map(|()| receiver)
.map_err(|e| RpcError::Other(e.into()))
.map_err(|e| RpcError::Other(Box::new(e)))
}
}

Expand Down Expand Up @@ -266,9 +279,9 @@ impl TypedClient {
Value::Array(vec) => Ok(Params::Array(vec)),
Value::Null => Ok(Params::None),
Value::Object(map) => Ok(Params::Map(map)),
_ => Err(RpcError::Other(format_err!(
"RPC params should serialize to a JSON array, JSON object or null"
))),
_ => Err(RpcError::Client(
"RPC params should serialize to a JSON array, JSON object or null".into(),
)),
};
let result = params.map(|params| self.0.call_method(method, params));

Expand All @@ -277,7 +290,7 @@ impl TypedClient {

log::debug!("response: {:?}", value);

serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns, error.into()))
serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns, Box::new(error)))
}
}

Expand All @@ -289,9 +302,9 @@ impl TypedClient {
Value::Array(vec) => Params::Array(vec),
Value::Null => Params::None,
_ => {
return Err(RpcError::Other(format_err!(
"RPC params should serialize to a JSON array, or null"
)))
return Err(RpcError::Client(
"RPC params should serialize to a JSON array, or null".into(),
))
}
};

Expand All @@ -314,9 +327,9 @@ impl TypedClient {
Value::Array(vec) => Params::Array(vec),
Value::Null => Params::None,
_ => {
return Err(RpcError::Other(format_err!(
"RPC params should serialize to a JSON array, or null"
)))
return Err(RpcError::Client(
"RPC params should serialize to a JSON array, or null".into(),
))
}
};

Expand Down
11 changes: 4 additions & 7 deletions core-client/transports/src/transports/duplex.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Duplex transport

use failure::format_err;
use futures::channel::{mpsc, oneshot};
use futures::{
task::{Context, Poll},
Expand Down Expand Up @@ -195,7 +194,7 @@ where
// It's a regular Req-Res call, so just answer.
Some(PendingRequest::Call(tx)) => {
tx.send(result)
.map_err(|_| RpcError::Other(format_err!("oneshot channel closed")))?;
.map_err(|_| RpcError::Client("oneshot channel closed".into()))?;
continue;
}
// It was a subscription request,
Expand All @@ -219,11 +218,9 @@ where
);
}
} else {
let err = RpcError::Other(format_err!(
let err = RpcError::Client(format!(
"Subscription {:?} ({:?}) rejected: {:?}",
id,
method,
result,
id, method, result,
));

if subscription.channel.unbounded_send(result).is_err() {
Expand Down Expand Up @@ -276,7 +273,7 @@ where
// Writes queued messages to sink.
log::debug!("handle outgoing");
loop {
let err = || Err(RpcError::Other(failure::format_err!("closing")));
let err = || Err(RpcError::Client("closing".into()));
match self.sink.as_mut().poll_ready(cx) {
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(_)) => return err().into(),
Expand Down
11 changes: 5 additions & 6 deletions core-client/transports/src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use super::RequestBuilder;
use crate::{RpcChannel, RpcError, RpcMessage, RpcResult};
use failure::format_err;
use futures::{Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use hyper::{http, rt, Client, Request, Uri};

Expand Down Expand Up @@ -39,13 +38,13 @@ fn do_connect(url: &str) -> impl Future<Output = RpcResult<RpcChannel>> {
let max_parallel = 8;
let url: Uri = match url.parse() {
Ok(url) => url,
Err(e) => return ready(Err(RpcError::Other(e.into()))),
Err(e) => return ready(Err(RpcError::Other(Box::new(e)))),
};

#[cfg(feature = "tls")]
let connector = match hyper_tls::HttpsConnector::new(4) {
Ok(connector) => connector,
Err(e) => return ready(Err(RpcError::Other(e.into()))),
Err(e) => return ready(Err(RpcError::Other(Box::new(e)))),
};
#[cfg(feature = "tls")]
let client = Client::builder().build::<_, hyper::Body>(connector);
Expand Down Expand Up @@ -97,16 +96,16 @@ fn do_connect(url: &str) -> impl Future<Output = RpcResult<RpcChannel>> {
let future = match result {
Ok(ref res) if !res.status().is_success() => {
log::trace!("http result status {}", res.status());
A(future::err(RpcError::Other(format_err!(
A(future::err(RpcError::Client(format!(
"Unexpected response status code: {}",
res.status()
))))
}
Ok(res) => B(res
.into_body()
.map_err(|e| RpcError::ParseError(e.to_string(), e.into()))
.map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
.concat2()),
Err(err) => A(future::err(RpcError::Other(err.into()))),
Err(err) => A(future::err(RpcError::Other(Box::new(err)))),
};
future.then(|result| {
if let Some(sender) = sender {
Expand Down
8 changes: 4 additions & 4 deletions core-client/transports/src/transports/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where

fn send_response(&mut self) -> Result<(), RpcError> {
if let Buffered::Response(r) = std::mem::replace(&mut self.buffered, Buffered::None) {
self.queue.0.start_send(r).map_err(|e| RpcError::Other(e.into()))?;
self.queue.0.start_send(r).map_err(|e| RpcError::Other(Box::new(e)))?;
}
Ok(())
}
Expand All @@ -97,7 +97,7 @@ where
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.poll_buffered(cx))?;
futures::ready!(self.queue.0.poll_ready(cx))
.map_err(|e| RpcError::Other(e.into()))
.map_err(|e| RpcError::Other(Box::new(e)))
.into()
}

Expand All @@ -110,13 +110,13 @@ where
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.poll_buffered(cx))?;
futures::ready!(self.queue.0.poll_flush_unpin(cx))
.map_err(|e| RpcError::Other(e.into()))
.map_err(|e| RpcError::Other(Box::new(e)))
.into()
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.queue.0.poll_close_unpin(cx))
.map_err(|e| RpcError::Other(e.into()))
.map_err(|e| RpcError::Other(Box::new(e)))
.into()
}
}
Expand Down
2 changes: 1 addition & 1 deletion core-client/transports/src/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub fn parse_response(
response: &str,
) -> Result<(Id, Result<Value, RpcError>, Option<String>, Option<SubscriptionId>), RpcError> {
jsonrpc_core::serde_from_str::<ClientResponse>(response)
.map_err(|e| RpcError::ParseError(e.to_string(), e.into()))
.map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e)))
.map(|response| {
let id = response.id().unwrap_or(Id::Null);
let sid = response.subscription_id();
Expand Down
21 changes: 11 additions & 10 deletions core-client/transports/src/transports/ws.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! JSON-RPC websocket client implementation.
use crate::{RpcChannel, RpcError};
use failure::Error;
use futures01::prelude::*;
use log::info;
use std::collections::VecDeque;
Expand All @@ -11,11 +10,11 @@ use websocket::{ClientBuilder, OwnedMessage};
/// Uses an unbuffered channel to queue outgoing rpc messages.
///
/// Returns `Err` if the `url` is invalid.
pub fn try_connect<T>(url: &str) -> Result<impl Future<Item = T, Error = RpcError>, Error>
pub fn try_connect<T>(url: &str) -> Result<impl Future<Item = T, Error = RpcError>, RpcError>
where
T: From<RpcChannel>,
{
let client_builder = ClientBuilder::new(url)?;
let client_builder = ClientBuilder::new(url).map_err(|e| RpcError::Other(Box::new(e)))?;
Ok(do_connect(client_builder))
}

Expand Down Expand Up @@ -54,7 +53,7 @@ where
tokio::spawn(rpc_client);
sender.into()
})
.map_err(|error| RpcError::Other(error.into()))
.map_err(|error| RpcError::Other(Box::new(error)))
}

struct WebsocketClient<TSink, TStream> {
Expand All @@ -67,7 +66,7 @@ impl<TSink, TStream, TError> WebsocketClient<TSink, TStream>
where
TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
TStream: Stream<Item = OwnedMessage, Error = TError>,
TError: Into<Error>,
TError: std::error::Error + Send + 'static,
{
pub fn new(sink: TSink, stream: TStream) -> Self {
Self {
Expand All @@ -82,7 +81,7 @@ impl<TSink, TStream, TError> Sink for WebsocketClient<TSink, TStream>
where
TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
TStream: Stream<Item = OwnedMessage, Error = TError>,
TError: Into<Error>,
TError: std::error::Error + Send + 'static,
{
type SinkItem = String;
type SinkError = RpcError;
Expand All @@ -101,20 +100,22 @@ where
self.queue.push_front(request);
break;
}
Err(error) => return Err(RpcError::Other(error.into())),
Err(error) => return Err(RpcError::Other(Box::new(error))),
},
None => break,
}
}
self.sink.poll_complete().map_err(|error| RpcError::Other(error.into()))
self.sink
.poll_complete()
.map_err(|error| RpcError::Other(Box::new(error)))
}
}

impl<TSink, TStream, TError> Stream for WebsocketClient<TSink, TStream>
where
TSink: Sink<SinkItem = OwnedMessage, SinkError = TError>,
TStream: Stream<Item = OwnedMessage, Error = TError>,
TError: Into<Error>,
TError: std::error::Error + Send + 'static,
{
type Item = String;
type Error = RpcError;
Expand All @@ -134,7 +135,7 @@ where
return Ok(Async::Ready(None));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(error) => return Err(RpcError::Other(error.into())),
Err(error) => return Err(RpcError::Other(Box::new(error))),
}
}
}
Expand Down