Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Addressing review grumbles.
  • Loading branch information
tomusdrw committed Jul 27, 2020
commit b01c3858bedafdeb3810ae92c54a8eea0fdbded1
1 change: 0 additions & 1 deletion core-client/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ jsonrpc-http-server = { version = "15.0", path = "../../http" }
jsonrpc-ipc-server = { version = "15.0", path = "../../ipc" }
lazy_static = "1.0"
env_logger = "0.7"
tokio = "0.1"

[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master" }
41 changes: 18 additions & 23 deletions core-client/transports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
use failure::{format_err, Fail};
use jsonrpc_core::futures::channel::{mpsc, oneshot};
use jsonrpc_core::futures::{
self, future,
self,
task::{Context, Poll},
Future, Stream, StreamExt, TryFutureExt,
Stream, StreamExt,
};
use jsonrpc_core::{Error, Params};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -183,22 +183,18 @@ impl From<RpcChannel> for RawClient {

impl RawClient {
/// Call RPC method with raw JSON.
pub fn call_method(&self, method: &str, params: Params) -> impl Future<Output = RpcResult<Value>> {
pub async fn call_method(&self, method: &str, params: Params) -> RpcResult<Value> {
let (sender, receiver) = oneshot::channel();
let msg = CallMessage {
method: method.into(),
params,
sender,
};
match self.0.send(msg.into()) {
Ok(()) => future::Either::Left(async move {
match receiver.await {
Ok(v) => v,
Err(e) => Err(RpcError::Other(e.into())),
}
}),
Err(error) => future::Either::Right(future::ready(Err(RpcError::Other(error.into())))),
}
let () = self.0.send(msg.into())
.map_err(|e| RpcError::Other(e.into()))?;

receiver.await
.map_err(|e| RpcError::Other(e.into()))?
}

/// Send RPC notification with raw JSON.
Expand Down Expand Up @@ -256,31 +252,30 @@ impl TypedClient {
}

/// Call RPC with serialization of request and deserialization of response.
pub fn call_method<T: Serialize, R: DeserializeOwned + 'static>(
pub async fn call_method<T: Serialize, R: DeserializeOwned>(
&self,
method: &str,
returns: &'static str,
returns: &str,
args: T,
) -> impl Future<Output = RpcResult<R>> {
) -> RpcResult<R> {
let args =
serde_json::to_value(args).expect("Only types with infallible serialisation can be used for JSON-RPC");
let params = match args {
Value::Array(vec) => Params::Array(vec),
Value::Null => Params::None,
Value::Object(map) => Params::Map(map),
_ => {
return future::Either::Left(future::ready(Err(RpcError::Other(format_err!(
return Err(RpcError::Other(format_err!(
"RPC params should serialize to a JSON array, JSON object or null"
)))))
)))
}
};

future::Either::Right(self.0.call_method(method, params).and_then(move |value: Value| {
log::debug!("response: {:?}", value);
let result =
serde_json::from_value::<R>(value).map_err(|error| RpcError::ParseError(returns.into(), error.into()));
future::ready(result)
}))
let value: Value = self.0.call_method(method, params).await?;
log::debug!("response: {:?}", value);

serde_json::from_value::<R>(value)
.map_err(|error| RpcError::ParseError(returns.into(), error.into()))
}

/// Call RPC with serialization of request only.
Expand Down
1 change: 0 additions & 1 deletion core/src/calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ impl<T: Metadata> fmt::Debug for RemoteProcedure<T> {
}
}

// TODO [ToDr] Check all the bounds
impl<F: Send + Sync + 'static, X: Send + 'static> RpcMethodSimple for F
where
F: Fn(Params) -> X,
Expand Down
5 changes: 1 addition & 4 deletions core/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,7 @@ impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
let jsonrpc = method.jsonrpc;
let valid_version = self.compatibility.is_version_valid(jsonrpc);

let call_method = |method: &Arc<dyn RpcMethod<T>>| {
// TODO [ToDr] lazy here?
method.call(params, meta)
};
let call_method = |method: &Arc<dyn RpcMethod<T>>| method.call(params, meta);

let result = match (valid_version, self.methods.get(&method.method)) {
(false, _) => Err(Error::invalid_version()),
Expand Down
1 change: 0 additions & 1 deletion derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@ jsonrpc-pubsub = { version = "15.0", path = "../pubsub" }
jsonrpc-tcp-server = { version = "15.0", path = "../tcp" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = "0.2"
trybuild = "1.0"
2 changes: 1 addition & 1 deletion server-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version = "15.0.0"

[dependencies]
bytes = "0.4"
futures = "0.1"
futures01 = { version = "0.1", package = "futures" }
globset = "0.4"
jsonrpc-core = { version = "15.0", path = "../core" }
lazy_static = "1.1.0"
Expand Down
8 changes: 4 additions & 4 deletions server-utils/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use std::io;
use tokio;

use futures::{self, Future};
use futures01::Future;

/// Possibly uninitialized event loop executor.
#[derive(Debug)]
Expand Down Expand Up @@ -83,7 +83,7 @@ impl Executor {
#[derive(Debug)]
pub struct RpcEventLoop {
executor: tokio::runtime::TaskExecutor,
close: Option<futures::Complete<()>>,
close: Option<futures01::Complete<()>>,
handle: Option<tokio::runtime::Shutdown>,
}

Expand All @@ -101,7 +101,7 @@ impl RpcEventLoop {

/// Spawns a new named thread with the `EventLoop`.
pub fn with_name(name: Option<String>) -> io::Result<Self> {
let (stop, stopped) = futures::oneshot();
let (stop, stopped) = futures01::oneshot();

let mut tb = tokio::runtime::Builder::new();
tb.core_threads(1);
Expand All @@ -112,7 +112,7 @@ impl RpcEventLoop {

let mut runtime = tb.build()?;
let executor = runtime.executor();
let terminate = futures::empty().select(stopped).map(|_| ()).map_err(|_| ());
let terminate = futures01::empty().select(stopped).map(|_| ()).map_err(|_| ());
runtime.spawn(terminate);
let handle = runtime.shutdown_on_idle();

Expand Down
3 changes: 2 additions & 1 deletion tcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ repository = "https://github.com/paritytech/jsonrpc"
version = "15.0.0"

[dependencies]
futures = "0.1"
futures01 = { version = "0.1", package = "futures" }
# TODO remove when we no longer need compat (use jsonrpc-core re-export instead)
futures03 = { version = "0.3", features = ["compat"], package = "futures" }
jsonrpc-core = { version = "15.0", path = "../core" }
jsonrpc-server-utils = { version = "15.0", path = "../server-utils" }
Expand Down
2 changes: 1 addition & 1 deletion tcp/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::net::SocketAddr;
use std::sync::Arc;

use crate::jsonrpc::futures::{self as futures03, channel::mpsc, StreamExt};
use futures::{Async, Poll, Stream};
use futures01::{Async, Poll, Stream};

use parking_lot::Mutex;

Expand Down
4 changes: 2 additions & 2 deletions tcp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::sync::Arc;

use tokio_service::Service as TokioService;

use futures::sync::oneshot;
use futures::{future, Future, Sink, Stream};
use futures01::sync::oneshot;
use futures01::{future, Future, Sink, Stream};

use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
use crate::server_utils::{codecs, reactor, tokio, tokio_codec::Framed, SuspendableStream};
Expand Down
2 changes: 1 addition & 1 deletion tcp/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use crate::jsonrpc::futures::FutureExt;
use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
use futures::Future;
use futures01::Future;

pub struct Service<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
handler: Arc<MetaIoHandler<M, S>>,
Expand Down
4 changes: 2 additions & 2 deletions tcp/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};

use futures::{future, Future};
use futures01::{future, Future};
use jsonrpc_core::{MetaIoHandler, Metadata, Value};

use crate::server_utils::tokio::{self, io, net::TcpStream, timer::Delay};
Expand Down Expand Up @@ -71,7 +71,7 @@ fn disconnect() {
}

fn dummy_request(addr: &SocketAddr, data: Vec<u8>) -> Vec<u8> {
let (ret_tx, ret_rx) = futures::sync::oneshot::channel();
let (ret_tx, ret_rx) = futures01::sync::oneshot::channel();

let stream = TcpStream::connect(addr)
.and_then(move |stream| io::write_all(stream, data))
Expand Down