Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 65 additions & 18 deletions core-client/transports/src/transports/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::{
task::{Context, Poll},
Future, Sink, SinkExt, Stream, StreamExt,
};
use jsonrpc_core::{BoxFuture, MetaIoHandler, Metadata};
use jsonrpc_core::{BoxFuture, MetaIoHandler, Metadata, Middleware};
use jsonrpc_pubsub::Session;
use std::ops::Deref;
use std::pin::Pin;
Expand All @@ -26,10 +26,11 @@ enum Buffered {
None,
}

impl<TMetadata, THandler> LocalRpc<THandler, TMetadata>
impl<TMetadata, THandler, TMiddleware> LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata,
THandler: Deref<Target = MetaIoHandler<TMetadata>>,
TMiddleware: Middleware<TMetadata>,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>>,
{
/// Creates a new `LocalRpc` with default metadata.
pub fn new(handler: THandler) -> Self
Expand All @@ -50,10 +51,11 @@ where
}
}

impl<TMetadata, THandler> Stream for LocalRpc<THandler, TMetadata>
impl<TMetadata, THandler, TMiddleware> Stream for LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
{
type Item = String;

Expand All @@ -62,10 +64,11 @@ where
}
}

impl<TMetadata, THandler> LocalRpc<THandler, TMetadata>
impl<TMetadata, THandler, TMiddleware> LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
{
fn poll_buffered(&mut self, cx: &mut Context) -> Poll<Result<(), RpcError>> {
let response = match self.buffered {
Expand All @@ -87,10 +90,11 @@ where
}
}

impl<TMetadata, THandler> Sink<String> for LocalRpc<THandler, TMetadata>
impl<TMetadata, THandler, TMiddleware> Sink<String> for LocalRpc<THandler, TMetadata>
where
TMetadata: Metadata + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
{
type Error = RpcError;

Expand Down Expand Up @@ -121,14 +125,15 @@ where
}
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata>`.
pub fn connect_with_metadata<TClient, THandler, TMetadata>(
/// Connects to a `Deref<Target = MetaIoHandler<Metadata>` specifying a custom middleware implementation.
pub fn connect_with_metadata_and_middleware<TClient, THandler, TMetadata, TMiddleware>(
handler: THandler,
meta: TMetadata,
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
TMetadata: Metadata + Unpin,
{
let (sink, stream) = LocalRpc::with_metadata(handler, meta).split();
Expand All @@ -137,24 +142,55 @@ where
(client, rpc_client)
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata + Default>`.
pub fn connect<TClient, THandler, TMetadata>(handler: THandler) -> (TClient, impl Future<Output = RpcResult<()>>)
/// Connects to a `Deref<Target = MetaIoHandler<Metadata>`.
pub fn connect_with_metadata<TClient, THandler, TMetadata>(
handler: THandler,
meta: TMetadata,
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
TMetadata: Metadata + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
{
connect_with_metadata_and_middleware(handler, meta)
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata + Default>` specifying a custom middleware implementation.
pub fn connect_with_middleware<TClient, THandler, TMetadata, TMiddleware>(
handler: THandler,
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
TMetadata: Metadata + Default + Unpin,
TMiddleware: Middleware<TMetadata> + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata, TMiddleware>> + Unpin,
{
connect_with_metadata(handler, Default::default())
connect_with_metadata_and_middleware(handler, Default::default())
}

/// Connects to a `Deref<Target = MetaIoHandler<Metadata + Default>`.
pub fn connect<TClient, THandler, TMetadata>(
handler: THandler
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
TMetadata: Metadata + Default + Unpin,
THandler: Deref<Target = MetaIoHandler<TMetadata>> + Unpin,
{
connect_with_middleware(handler)
}

/// Metadata for LocalRpc.
pub type LocalMeta = Arc<Session>;

/// Connects with pubsub.
pub fn connect_with_pubsub<TClient, THandler>(handler: THandler) -> (TClient, impl Future<Output = RpcResult<()>>)
/// Connects with pubsub specifying a custom middleware implementation.
pub fn connect_with_pubsub_and_middleware<TClient, THandler, TMiddleware>(
handler: THandler,
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<LocalMeta>> + Unpin,
TMiddleware: Middleware<LocalMeta> + Unpin,
THandler: Deref<Target = MetaIoHandler<LocalMeta, TMiddleware>> + Unpin,
{
let (tx, rx) = mpsc::unbounded();
let meta = Arc::new(Session::new(tx));
Expand All @@ -164,3 +200,14 @@ where
let client = TClient::from(sender);
(client, rpc_client)
}

/// Connects with pubsub.
pub fn connect_with_pubsub<TClient, THandler>(
handler: THandler
) -> (TClient, impl Future<Output = RpcResult<()>>)
where
TClient: From<RpcChannel>,
THandler: Deref<Target = MetaIoHandler<LocalMeta>> + Unpin,
{
connect_with_pubsub_and_middleware(handler)
}