Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix derive.
  • Loading branch information
tomusdrw committed Jul 21, 2020
commit c4df7be1d755d30bc981fd3ab387338e2941348d
3 changes: 2 additions & 1 deletion core-client/transports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,9 @@ impl TypedClient {
#[cfg(test)]
mod tests {
use super::*;
use futures01::prelude::{Future as _};
use crate::transports::local;
use crate::{RpcChannel, RpcError, TypedClient};
use crate::{RpcChannel, TypedClient};
use jsonrpc_core::{self as core, IoHandler};
use jsonrpc_pubsub::{PubSubHandler, Subscriber, SubscriptionId};
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down
67 changes: 36 additions & 31 deletions core-client/transports/src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,31 @@ use hyper::{http, rt, Client, Request, Uri};
use super::RequestBuilder;

/// Create a HTTP Client
pub fn connect<TClient>(url: &str) -> impl Future<Output = RpcResult<TClient>>
pub fn connect<TClient>(url: &str) -> impl futures01::Future<Item = TClient, Error = RpcError>
where
TClient: From<RpcChannel>,
{
use futures::TryFutureExt;
let connect = connect03(url).compat();
rt::lazy(|| connect)
}

fn connect03<TClient>(url: &str) -> impl Future<Output = RpcResult<TClient>>
where
TClient: From<RpcChannel>,
{
use futures::future::ready;
use futures::future::Either::{Left, Right};

let max_parallel = 8;
let url: Uri = match url.parse() {
Ok(url) => url,
Err(e) => return Left(ready(Err(RpcError::Other(e.into())))),
Err(e) => return ready(Err(RpcError::Other(e.into()))),
};

#[cfg(feature = "tls")]
let connector = match hyper_tls::HttpsConnector::new(4) {
Ok(connector) => connector,
Err(e) => return Left(ready(Err(RpcError::Other(e.into())))),
Err(e) => return ready(Err(RpcError::Other(e.into()))),
};
#[cfg(feature = "tls")]
let client = Client::builder().build::<_, hyper::Body>(connector);
Expand Down Expand Up @@ -101,24 +109,22 @@ where
})
});

Right(futures::compat::Compat01As03::new(
rt::lazy(move || {
rt::spawn(fut.map_err(|e: RpcError| log::error!("RPC Client error: {:?}", e)));
Ok(TClient::from(sender.into()))
})
))
rt::spawn(fut.map_err(|e: RpcError| log::error!("RPC Client error: {:?}", e)));
ready(Ok(TClient::from(sender.into())))
}

#[cfg(test)]
mod tests {
use super::*;
use crate::*;
use assert_matches::assert_matches;
use crate::*;
use futures01::prelude::*;
use hyper::rt;
use jsonrpc_core::futures::{FutureExt, TryFutureExt};
use jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value};
use jsonrpc_http_server::*;
use std::net::SocketAddr;
use std::time::Duration;
use super::*;

fn id<T>(t: T) -> T {
t
Expand Down Expand Up @@ -168,7 +174,7 @@ mod tests {
fn io() -> IoHandler {
let mut io = IoHandler::default();
io.add_sync_method("hello", |params: Params| match params.parse::<(String,)>() {
Ok((msg,)) => Ok(Value::String(format!("hello {}", msg))),
Ok((msg,)) => Ok(Value::String(format!("hello {}", msg))),
_ => Ok(Value::String("world".into())),
});
io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34))));
Expand All @@ -190,13 +196,13 @@ mod tests {
}

impl TestClient {
fn hello(&self, msg: &'static str) -> impl Future<Item = String, Error = RpcError> {
fn hello(&self, msg: &'static str) -> impl Future<Output = RpcResult<String>> {
self.0.call_method("hello", "String", (msg,))
}
fn fail(&self) -> impl Future<Item = (), Error = RpcError> {
fn fail(&self) -> impl Future<Output = RpcResult<()>> {
self.0.call_method("fail", "()", ())
}
fn notify(&self, value: u64) -> impl Future<Item = (), Error = RpcError> {
fn notify(&self, value: u64) -> RpcResult<()> {
self.0.notify("notify", (value,))
}
}
Expand All @@ -212,15 +218,15 @@ mod tests {
// when
let run = connect(&server.uri)
.and_then(|client: TestClient| {
client.hello("http").and_then(move |result| {
client.hello("http").compat().and_then(move |result| {
drop(client);
let _ = tx.send(result);
Ok(())
future::ready(Ok(()))
})
})
.map_err(|e| log::error!("RPC Client error: {:?}", e));

rt::run(run);
rt::run(run.compat());

// then
let result = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand All @@ -238,15 +244,14 @@ mod tests {
// when
let run = connect(&server.uri)
.and_then(|client: TestClient| {
client.notify(12).and_then(move |result| {
drop(client);
let _ = tx.send(result);
Ok(())
})
let result = client.notify(12);
drop(client);
let _ = tx.send(result);
Ok(())
})
.map_err(|e| log::error!("RPC Client error: {:?}", e));

rt::run(run);
rt::run(run.compat());

// then
rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand Down Expand Up @@ -282,13 +287,13 @@ mod tests {
// when
let run = connect(&server.uri)
.and_then(|client: TestClient| {
client.fail().then(move |res| {
client.fail().compat().then(move |res| {
let _ = tx.send(res);
Ok(())
})
})
.map_err(|e| log::error!("RPC Client error: {:?}", e));
rt::run(run);
rt::run(run.compat());

// then
let res = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand Down Expand Up @@ -319,14 +324,14 @@ mod tests {

let call = client
.and_then(|client: TestClient| {
client.hello("http").then(move |res| {
client.hello("http").compat().then(move |res| {
let _ = tx.send(res);
Ok(())
})
})
.map_err(|e| log::error!("RPC Client error: {:?}", e));

rt::run(call);
rt::run(call.compat());

// then
let res = rx.recv_timeout(Duration::from_secs(3)).unwrap();
Expand Down Expand Up @@ -366,7 +371,7 @@ mod tests {
})
.and_then(move |_| {
server.start(); // todo: make the server start on the main thread
client.hello("http2").then(move |res| {
client.hello("http2").compat().then(|res| {
let _ = tx2.send(res);
Ok(())
})
Expand All @@ -375,7 +380,7 @@ mod tests {
.map_err(|e| log::error!("RPC Client error: {:?}", e));

// when
rt::run(call);
rt::run(call.compat());

let res = rx.recv_timeout(Duration::from_secs(3)).unwrap();
assert!(res.is_err());
Expand Down
2 changes: 1 addition & 1 deletion core/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl<T: Metadata, S: Middleware<T>> MetaIoHandler<T, S> {
/// Adds new supported synchronous method.
///
/// A backward-compatible wrapper.
pub fn add_sync_method<F, X>(&mut self, name: &str, method: F)
pub fn add_sync_method<F>(&mut self, name: &str, method: F)
where
F: RpcMethodSync,
{
Expand Down
4 changes: 2 additions & 2 deletions derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ quote = "1.0.6"
proc-macro-crate = "0.1.4"

[dev-dependencies]
assert_matches = "1.3"
jsonrpc-core = { version = "14.2", path = "../core" }
jsonrpc-core-client = { version = "14.2", path = "../core-client" }
jsonrpc-pubsub = { version = "14.2", path = "../pubsub" }
jsonrpc-tcp-server = { version = "14.2", path = "../tcp" }
futures = "~0.1.6"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = "0.1"
tokio = "0.2"
trybuild = "1.0"
4 changes: 2 additions & 2 deletions derive/examples/generic-trait-bounds.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

use jsonrpc_core::{IoHandler, IoHandlerExtension, Result, BoxFuture};
use jsonrpc_core::{IoHandler, IoHandlerExtension, Result, BoxFuture, futures::future};
use jsonrpc_derive::rpc;

// One is both parameter and a result so requires both Serialize and DeserializeOwned
Expand Down Expand Up @@ -49,7 +49,7 @@ impl Rpc<InAndOut, In, Out> for RpcImpl {
}

fn call(&self, num: InAndOut) -> BoxFuture<Result<(InAndOut, u64)>> {
jsonrpc_core::futures::future::ready(Ok((InAndOut { foo: num.foo + 999 }, num.foo)))
Box::pin(future::ready(Ok((InAndOut { foo: num.foo + 999 }, num.foo))))
}
}

Expand Down
8 changes: 4 additions & 4 deletions derive/examples/meta-macros.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use jsonrpc_core::types::params::Params;
use jsonrpc_core::{futures, MetaIoHandler, Metadata, Result, Value, BoxFuture};
use jsonrpc_core::futures::future;
use jsonrpc_core::{MetaIoHandler, Metadata, Result, Value, BoxFuture, Params};
use jsonrpc_derive::rpc;

#[derive(Clone)]
Expand Down Expand Up @@ -62,11 +62,11 @@ impl Rpc<u64> for RpcImpl {
}

fn call(&self, x: u64) -> BoxFuture<Result<String>> {
futures::finished(format!("OK: {}", x))
Box::pin(future::ready(Ok(format!("OK: {}", x))))
}

fn call_meta(&self, meta: Self::Metadata, map: BTreeMap<String, Value>) -> BoxFuture<Result<String>> {
futures::finished(format!("From: {}, got: {:?}", meta.0, map))
Box::pin(future::ready(Ok(format!("From: {}, got: {:?}", meta.0, map))))
}

fn notify(&self, a: u64) {
Expand Down
3 changes: 1 addition & 2 deletions derive/examples/pubsub-macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::collections::HashMap;
use std::sync::{atomic, Arc, RwLock};
use std::thread;

use jsonrpc_core::futures::Future;
use jsonrpc_core::{Error, ErrorCode, Result};
use jsonrpc_derive::rpc;
use jsonrpc_pubsub::typed;
Expand Down Expand Up @@ -70,7 +69,7 @@ fn main() {
{
let subscribers = active_subscriptions.read().unwrap();
for sink in subscribers.values() {
let _ = sink.notify(Ok("Hello World!".into())).wait();
let _ = sink.notify(Ok("Hello World!".into()));
}
}
thread::sleep(::std::time::Duration::from_secs(1));
Expand Down
21 changes: 11 additions & 10 deletions derive/examples/std.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A simple example
#![deny(missing_docs)]
use jsonrpc_core::futures::future::{self, Future};
use jsonrpc_core::{Error, IoHandler, Result};
use jsonrpc_core::futures::{self, future, TryFutureExt};
use jsonrpc_core::{IoHandler, Result, BoxFuture};
use jsonrpc_core_client::transports::local;
use jsonrpc_derive::rpc;

Expand All @@ -18,7 +18,7 @@ pub trait Rpc {

/// Performs asynchronous operation.
#[rpc(name = "callAsync")]
fn call(&self, a: u64) -> future::Ready<Result<String, Error>>;
fn call(&self, a: u64) -> BoxFuture<Result<String>>;

/// Handles a notification.
#[rpc(name = "notify")]
Expand All @@ -36,8 +36,8 @@ impl Rpc for RpcImpl {
Ok(a + b)
}

fn call(&self, _: u64) -> future::Ready<Result<String, Error>> {
future::ready(Ok("OK".to_owned()))
fn call(&self, _: u64) -> BoxFuture<Result<String>> {
Box::pin(future::ready(Ok("OK".to_owned())))
}

fn notify(&self, a: u64) {
Expand All @@ -49,9 +49,10 @@ fn main() {
let mut io = IoHandler::new();
io.extend_with(RpcImpl.to_delegate());

let fut = {
let (client, server) = local::connect::<gen_client::Client, _, _>(io);
client.add(5, 6).map(|res| println!("5 + 6 = {}", res)).join(server)
};
fut.wait().unwrap();
let (client, server) = local::connect::<RpcClient, _, _>(io);
let fut = client.add(5, 6).map_ok(|res| println!("5 + 6 = {}", res));

futures::executor::block_on(async move {
futures::join!(fut, server)
}).0.unwrap();
}
Loading