Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix client tests.
  • Loading branch information
tomusdrw committed Jul 22, 2020
commit 59857b55c88a7c40b20fbafcb37fc4479fe6a0c6
96 changes: 44 additions & 52 deletions core-client/transports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,9 @@ impl TypedClient {
#[cfg(test)]
mod tests {
use super::*;
use futures01::prelude::{Future as _};
use crate::transports::local;
use crate::{RpcChannel, TypedClient};
use jsonrpc_core::futures::FutureExt;
use jsonrpc_core::{self as core, IoHandler};
use jsonrpc_pubsub::{PubSubHandler, Subscriber, SubscriptionId};
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -351,7 +351,7 @@ mod tests {
self.0.call_method("add", "u64", (a, b))
}

fn completed(&self, success: bool) -> impl Future<Output = RpcResult<()>> {
fn completed(&self, success: bool) -> RpcResult<()> {
self.0.notify("completed", (success,))
}
}
Expand All @@ -366,69 +366,65 @@ mod tests {
Ok(jsonrpc_core::to_value(res).unwrap())
});

let (tx, rx) = std::sync::mpsc::channel();
let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
let fut = client
.clone()
.add(3, 4)
.and_then(move |res| client.add(res, 5))
.join(rpc_client)
.map(|(res, ())| {
assert_eq!(res, 12);
})
.map_err(|err| {
eprintln!("{:?}", err);
assert!(false);
});
tokio::run(fut);
let fut = async move {
let res = client.add(3, 4).await?;
let res = client.add(res, 5).await?;
assert_eq!(res, 12);
tx.send(()).unwrap();
Ok(()) as RpcResult<_>
};
let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
pool.spawn_ok(rpc_client.map(|x| x.unwrap()));
pool.spawn_ok(fut.map(|x| x.unwrap()));
rx.recv().unwrap()
}

#[test]
fn should_send_notification() {
crate::logger::init_log();
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut handler = IoHandler::new();
handler.add_notification("completed", |params: Params| {
handler.add_notification("completed", move |params: Params| {
let (success,) = params.parse::<(bool,)>().expect("expected to receive one boolean");
assert_eq!(success, true);
tx.send(()).unwrap();
});

let (client, rpc_client) = local::connect::<AddClient, _, _>(handler);
let fut = client
.clone()
.completed(true)
.map(move |()| drop(client))
.join(rpc_client)
.map(|_| ())
.map_err(|err| {
eprintln!("{:?}", err);
assert!(false);
});
tokio::run(fut);
client.completed(true).unwrap();
let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
pool.spawn_ok(rpc_client.map(|x| x.unwrap()));
rx.recv().unwrap()
}

#[test]
fn should_handle_subscription() {
crate::logger::init_log();
// given
let (finish, finished) = std::sync::mpsc::sync_channel(1);
let mut handler = PubSubHandler::<local::LocalMeta, _>::default();
let called = Arc::new(AtomicBool::new(false));
let called2 = called.clone();
handler.add_subscription(
"hello",
("subscribe_hello", |params, _meta, subscriber: Subscriber| {
("subscribe_hello", move |params, _meta, subscriber: Subscriber| {
assert_eq!(params, core::Params::None);
let sink = subscriber
.assign_id(SubscriptionId::Number(5))
.expect("assigned subscription id");
let finish = finish.clone();
std::thread::spawn(move || {
for i in 0..3 {
std::thread::sleep(std::time::Duration::from_millis(100));
let value = serde_json::json!({
"subscription": 5,
"result": vec![i],
});
sink.notify(serde_json::from_value(value).unwrap())
.expect("sent notification");
let _ = sink.notify(serde_json::from_value(value).unwrap());
}
finish.send(()).unwrap();
});
}),
("unsubscribe_hello", move |id, _meta| {
Expand All @@ -440,34 +436,30 @@ mod tests {
);

// when
let (tx, rx) = std::sync::mpsc::channel();
let (client, rpc_client) = local::connect_with_pubsub::<TypedClient, _>(handler);
let received = Arc::new(std::sync::Mutex::new(vec![]));
let r2 = received.clone();
let fut = client
.subscribe::<_, (u32,)>("subscribe_hello", (), "hello", "unsubscribe_hello", "u32")
.and_then(|stream| {
stream
.into_future()
.map(move |(result, _)| {
drop(client);
r2.lock().unwrap().push(result.unwrap());
})
.map_err(|_| {
panic!("Expected message not received.");
})
})
.join(rpc_client)
.map(|(res, _)| {
log::info!("ok {:?}", res);
})
.map_err(|err| {
log::error!("err {:?}", err);
});
tokio::run(fut);
assert_eq!(called.load(Ordering::SeqCst), true);
let fut = async move {
let mut stream = client.subscribe::<_, (u32,)>("subscribe_hello", (), "hello", "unsubscribe_hello", "u32")?;
let result = stream.next().await;
r2.lock().unwrap().push(result.expect("Expected at least one item."));
tx.send(()).unwrap();
Ok(()) as RpcResult<_>
};

let pool = futures::executor::ThreadPool::builder().pool_size(1).create().unwrap();
pool.spawn_ok(rpc_client.map(|_| ()));
pool.spawn_ok(fut.map(|x| x.unwrap()));

rx.recv().unwrap();
assert!(
!received.lock().unwrap().is_empty(),
"Expected at least one received item."
);
// The session is being dropped only when another notification is received.
// TODO [ToDr] we should unsubscribe as soon as the stream is dropped instead!
finished.recv().unwrap();
assert_eq!(called.load(Ordering::SeqCst), true, "Unsubscribe not called.");
}
}
6 changes: 5 additions & 1 deletion core-client/transports/src/transports/duplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ impl<TSink, TStream> Duplex<TSink, TStream> {
pub fn duplex<TSink, TStream>(
sink: Pin<Box<TSink>>,
stream: Pin<Box<TStream>>,
) -> (Duplex<TSink, TStream>, RpcChannel) {
) -> (Duplex<TSink, TStream>, RpcChannel)
where
TSink: Sink<String>,
TStream: Stream<Item = String>,
{
let (sender, receiver) = mpsc::unbounded();
let client = Duplex::new(sink, stream, receiver);
(client, sender.into())
Expand Down
Loading