diff --git a/core-client/transports/Cargo.toml b/core-client/transports/Cargo.toml index 6e19608a8..73a311bcb 100644 --- a/core-client/transports/Cargo.toml +++ b/core-client/transports/Cargo.toml @@ -36,7 +36,7 @@ ipc = [ arbitrary_precision = ["serde_json/arbitrary_precision", "jsonrpc-core/arbitrary_precision"] [dependencies] -failure = "0.1" +derive_more = "0.99" futures = { version = "0.3", features = [ "compat" ] } jsonrpc-core = { version = "15.0", path = "../../core" } jsonrpc-pubsub = { version = "15.0", path = "../../pubsub" } diff --git a/core-client/transports/src/lib.rs b/core-client/transports/src/lib.rs index c8cfd2cef..001177a33 100644 --- a/core-client/transports/src/lib.rs +++ b/core-client/transports/src/lib.rs @@ -2,7 +2,6 @@ #![deny(missing_docs)] -use failure::{format_err, Fail}; use jsonrpc_core::futures::channel::{mpsc, oneshot}; use jsonrpc_core::futures::{ self, @@ -22,20 +21,34 @@ pub mod transports; mod logger; /// The errors returned by the client. -#[derive(Debug, Fail)] +#[derive(Debug, derive_more::Display)] pub enum RpcError { /// An error returned by the server. - #[fail(display = "Server returned rpc error {}", _0)] + #[display(fmt = "Server returned rpc error {}", _0)] JsonRpcError(Error), /// Failure to parse server response. - #[fail(display = "Failed to parse server response as {}: {}", _0, _1)] - ParseError(String, failure::Error), + #[display(fmt = "Failed to parse server response as {}: {}", _0, _1)] + ParseError(String, Box), /// Request timed out. - #[fail(display = "Request timed out")] + #[display(fmt = "Request timed out")] Timeout, + /// A general client error. + #[display(fmt = "Client error: {}", _0)] + Client(String), /// Not rpc specific errors. - #[fail(display = "{}", _0)] - Other(failure::Error), + #[display(fmt = "{}", _0)] + Other(Box), +} + +impl std::error::Error for RpcError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match *self { + Self::JsonRpcError(ref e) => Some(e), + Self::ParseError(_, ref e) => Some(&**e), + Self::Other(ref e) => Some(&**e), + _ => None, + } + } } impl From for RpcError { @@ -162,7 +175,7 @@ impl Stream for TypedSubscriptionStream Some( serde_json::from_value::(value) - .map_err(|error| RpcError::ParseError(self.returns.into(), error.into())), + .map_err(|error| RpcError::ParseError(self.returns.into(), Box::new(error))), ), None => None, Some(Err(err)) => Some(Err(err.into())), @@ -192,9 +205,9 @@ impl RawClient { }; let result = self.0.send(msg.into()); async move { - let () = result.map_err(|e| RpcError::Other(e.into()))?; + let () = result.map_err(|e| RpcError::Other(Box::new(e)))?; - receiver.await.map_err(|e| RpcError::Other(e.into()))? + receiver.await.map_err(|e| RpcError::Other(Box::new(e)))? } } @@ -206,7 +219,7 @@ impl RawClient { }; match self.0.send(msg.into()) { Ok(()) => Ok(()), - Err(error) => Err(RpcError::Other(error.into())), + Err(error) => Err(RpcError::Other(Box::new(error))), } } @@ -232,7 +245,7 @@ impl RawClient { self.0 .send(msg.into()) .map(|()| receiver) - .map_err(|e| RpcError::Other(e.into())) + .map_err(|e| RpcError::Other(Box::new(e))) } } @@ -266,9 +279,9 @@ impl TypedClient { Value::Array(vec) => Ok(Params::Array(vec)), Value::Null => Ok(Params::None), Value::Object(map) => Ok(Params::Map(map)), - _ => Err(RpcError::Other(format_err!( - "RPC params should serialize to a JSON array, JSON object or null" - ))), + _ => Err(RpcError::Client( + "RPC params should serialize to a JSON array, JSON object or null".into(), + )), }; let result = params.map(|params| self.0.call_method(method, params)); @@ -277,7 +290,7 @@ impl TypedClient { log::debug!("response: {:?}", value); - serde_json::from_value::(value).map_err(|error| RpcError::ParseError(returns, error.into())) + serde_json::from_value::(value).map_err(|error| RpcError::ParseError(returns, Box::new(error))) } } @@ -289,9 +302,9 @@ impl TypedClient { Value::Array(vec) => Params::Array(vec), Value::Null => Params::None, _ => { - return Err(RpcError::Other(format_err!( - "RPC params should serialize to a JSON array, or null" - ))) + return Err(RpcError::Client( + "RPC params should serialize to a JSON array, or null".into(), + )) } }; @@ -314,9 +327,9 @@ impl TypedClient { Value::Array(vec) => Params::Array(vec), Value::Null => Params::None, _ => { - return Err(RpcError::Other(format_err!( - "RPC params should serialize to a JSON array, or null" - ))) + return Err(RpcError::Client( + "RPC params should serialize to a JSON array, or null".into(), + )) } }; diff --git a/core-client/transports/src/transports/duplex.rs b/core-client/transports/src/transports/duplex.rs index 1296cfd3d..c92cd1fe8 100644 --- a/core-client/transports/src/transports/duplex.rs +++ b/core-client/transports/src/transports/duplex.rs @@ -1,6 +1,5 @@ //! Duplex transport -use failure::format_err; use futures::channel::{mpsc, oneshot}; use futures::{ task::{Context, Poll}, @@ -195,7 +194,7 @@ where // It's a regular Req-Res call, so just answer. Some(PendingRequest::Call(tx)) => { tx.send(result) - .map_err(|_| RpcError::Other(format_err!("oneshot channel closed")))?; + .map_err(|_| RpcError::Client("oneshot channel closed".into()))?; continue; } // It was a subscription request, @@ -219,11 +218,9 @@ where ); } } else { - let err = RpcError::Other(format_err!( + let err = RpcError::Client(format!( "Subscription {:?} ({:?}) rejected: {:?}", - id, - method, - result, + id, method, result, )); if subscription.channel.unbounded_send(result).is_err() { @@ -276,7 +273,7 @@ where // Writes queued messages to sink. log::debug!("handle outgoing"); loop { - let err = || Err(RpcError::Other(failure::format_err!("closing"))); + let err = || Err(RpcError::Client("closing".into())); match self.sink.as_mut().poll_ready(cx) { Poll::Ready(Ok(())) => {} Poll::Ready(Err(_)) => return err().into(), diff --git a/core-client/transports/src/transports/http.rs b/core-client/transports/src/transports/http.rs index e5d40d1b3..b4b7cb9ae 100644 --- a/core-client/transports/src/transports/http.rs +++ b/core-client/transports/src/transports/http.rs @@ -4,7 +4,6 @@ use super::RequestBuilder; use crate::{RpcChannel, RpcError, RpcMessage, RpcResult}; -use failure::format_err; use futures::{Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use hyper::{http, rt, Client, Request, Uri}; @@ -39,13 +38,13 @@ fn do_connect(url: &str) -> impl Future> { let max_parallel = 8; let url: Uri = match url.parse() { Ok(url) => url, - Err(e) => return ready(Err(RpcError::Other(e.into()))), + Err(e) => return ready(Err(RpcError::Other(Box::new(e)))), }; #[cfg(feature = "tls")] let connector = match hyper_tls::HttpsConnector::new(4) { Ok(connector) => connector, - Err(e) => return ready(Err(RpcError::Other(e.into()))), + Err(e) => return ready(Err(RpcError::Other(Box::new(e)))), }; #[cfg(feature = "tls")] let client = Client::builder().build::<_, hyper::Body>(connector); @@ -97,16 +96,16 @@ fn do_connect(url: &str) -> impl Future> { let future = match result { Ok(ref res) if !res.status().is_success() => { log::trace!("http result status {}", res.status()); - A(future::err(RpcError::Other(format_err!( + A(future::err(RpcError::Client(format!( "Unexpected response status code: {}", res.status() )))) } Ok(res) => B(res .into_body() - .map_err(|e| RpcError::ParseError(e.to_string(), e.into())) + .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e))) .concat2()), - Err(err) => A(future::err(RpcError::Other(err.into()))), + Err(err) => A(future::err(RpcError::Other(Box::new(err)))), }; future.then(|result| { if let Some(sender) = sender { diff --git a/core-client/transports/src/transports/local.rs b/core-client/transports/src/transports/local.rs index aee50d895..e4d91eaef 100644 --- a/core-client/transports/src/transports/local.rs +++ b/core-client/transports/src/transports/local.rs @@ -81,7 +81,7 @@ where fn send_response(&mut self) -> Result<(), RpcError> { if let Buffered::Response(r) = std::mem::replace(&mut self.buffered, Buffered::None) { - self.queue.0.start_send(r).map_err(|e| RpcError::Other(e.into()))?; + self.queue.0.start_send(r).map_err(|e| RpcError::Other(Box::new(e)))?; } Ok(()) } @@ -97,7 +97,7 @@ where fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { futures::ready!(self.poll_buffered(cx))?; futures::ready!(self.queue.0.poll_ready(cx)) - .map_err(|e| RpcError::Other(e.into())) + .map_err(|e| RpcError::Other(Box::new(e))) .into() } @@ -110,13 +110,13 @@ where fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { futures::ready!(self.poll_buffered(cx))?; futures::ready!(self.queue.0.poll_flush_unpin(cx)) - .map_err(|e| RpcError::Other(e.into())) + .map_err(|e| RpcError::Other(Box::new(e))) .into() } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { futures::ready!(self.queue.0.poll_close_unpin(cx)) - .map_err(|e| RpcError::Other(e.into())) + .map_err(|e| RpcError::Other(Box::new(e))) .into() } } diff --git a/core-client/transports/src/transports/mod.rs b/core-client/transports/src/transports/mod.rs index fdf54d74a..00b6f0600 100644 --- a/core-client/transports/src/transports/mod.rs +++ b/core-client/transports/src/transports/mod.rs @@ -82,7 +82,7 @@ pub fn parse_response( response: &str, ) -> Result<(Id, Result, Option, Option), RpcError> { jsonrpc_core::serde_from_str::(response) - .map_err(|e| RpcError::ParseError(e.to_string(), e.into())) + .map_err(|e| RpcError::ParseError(e.to_string(), Box::new(e))) .map(|response| { let id = response.id().unwrap_or(Id::Null); let sid = response.subscription_id(); diff --git a/core-client/transports/src/transports/ws.rs b/core-client/transports/src/transports/ws.rs index ea3ea8cf4..d74d5df44 100644 --- a/core-client/transports/src/transports/ws.rs +++ b/core-client/transports/src/transports/ws.rs @@ -1,6 +1,5 @@ //! JSON-RPC websocket client implementation. use crate::{RpcChannel, RpcError}; -use failure::Error; use futures01::prelude::*; use log::info; use std::collections::VecDeque; @@ -11,11 +10,11 @@ use websocket::{ClientBuilder, OwnedMessage}; /// Uses an unbuffered channel to queue outgoing rpc messages. /// /// Returns `Err` if the `url` is invalid. -pub fn try_connect(url: &str) -> Result, Error> +pub fn try_connect(url: &str) -> Result, RpcError> where T: From, { - let client_builder = ClientBuilder::new(url)?; + let client_builder = ClientBuilder::new(url).map_err(|e| RpcError::Other(Box::new(e)))?; Ok(do_connect(client_builder)) } @@ -54,7 +53,7 @@ where tokio::spawn(rpc_client); sender.into() }) - .map_err(|error| RpcError::Other(error.into())) + .map_err(|error| RpcError::Other(Box::new(error))) } struct WebsocketClient { @@ -67,7 +66,7 @@ impl WebsocketClient where TSink: Sink, TStream: Stream, - TError: Into, + TError: std::error::Error + Send + 'static, { pub fn new(sink: TSink, stream: TStream) -> Self { Self { @@ -82,7 +81,7 @@ impl Sink for WebsocketClient where TSink: Sink, TStream: Stream, - TError: Into, + TError: std::error::Error + Send + 'static, { type SinkItem = String; type SinkError = RpcError; @@ -101,12 +100,14 @@ where self.queue.push_front(request); break; } - Err(error) => return Err(RpcError::Other(error.into())), + Err(error) => return Err(RpcError::Other(Box::new(error))), }, None => break, } } - self.sink.poll_complete().map_err(|error| RpcError::Other(error.into())) + self.sink + .poll_complete() + .map_err(|error| RpcError::Other(Box::new(error))) } } @@ -114,7 +115,7 @@ impl Stream for WebsocketClient where TSink: Sink, TStream: Stream, - TError: Into, + TError: std::error::Error + Send + 'static, { type Item = String; type Error = RpcError; @@ -134,7 +135,7 @@ where return Ok(Async::Ready(None)); } Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(error) => return Err(RpcError::Other(error.into())), + Err(error) => return Err(RpcError::Other(Box::new(error))), } } }