Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rework to handler middleware
  • Loading branch information
grbIzl committed Sep 16, 2020
commit 791cdf860097c58b7f42459fe42daf2dd79df350
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ description = "Substrate RPC servers."
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
futures = "~0.1.6"
jsonrpc-core = "14.2.0"
pubsub = { package = "jsonrpc-pubsub", version = "14.2.0" }
log = "0.4.8"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-rc6"}
serde = "1.0.101"
serde_json = "1.0.41"
sp-runtime = { version = "2.0.0-rc6", path = "../../primitives/runtime" }
Expand All @@ -23,6 +25,4 @@ sp-runtime = { version = "2.0.0-rc6", path = "../../primitives/runtime" }
http = { package = "jsonrpc-http-server", version = "14.2.0" }
hyper = "0.12"
ipc = { version = "14.2.0", package = "jsonrpc-ipc-server" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-rc6"}
ws-core = { package = "ws", version = "0.9" }
ws = { package = "jsonrpc-ws-server", version = "14.2.0" }
24 changes: 11 additions & 13 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

#![warn(missing_docs)]

mod middleware;

use std::io;
use jsonrpc_core::IoHandlerExtension;
use jsonrpc_core::{IoHandlerExtension, MetaIoHandler};
use log::error;
use pubsub::PubSubMetadata;

#[cfg(not(target_os = "unknown"))]
mod middleware;
use prometheus_endpoint::Registry;

/// Maximal payload accepted by RPC servers.
const MAX_PAYLOAD: usize = 15 * 1024 * 1024;
Expand All @@ -35,15 +35,19 @@ const MAX_PAYLOAD: usize = 15 * 1024 * 1024;
const WS_MAX_CONNECTIONS: usize = 100;

/// The RPC IoHandler containing all requested APIs.
pub type RpcHandler<T> = pubsub::PubSubHandler<T>;
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;

pub use self::inner::*;
pub use middleware::RpcMiddleware;

/// Construct rpc `IoHandler`
pub fn rpc_handler<M: PubSubMetadata>(
extension: impl IoHandlerExtension<M>
extension: impl IoHandlerExtension<M>,
metrics_registry: Option<&Registry>,
) -> RpcHandler<M> {
let mut io = pubsub::PubSubHandler::default();

let io_handler = MetaIoHandler::with_middleware(RpcMiddleware::new(metrics_registry));
let mut io = pubsub::PubSubHandler::new(io_handler);
extension.augment(&mut io);

// add an endpoint to list all available methods.
Expand All @@ -63,8 +67,6 @@ pub fn rpc_handler<M: PubSubMetadata>(

#[cfg(not(target_os = "unknown"))]
mod inner {
use middleware::{HttpRpcMiddleware, WSRpcMiddleware};
use prometheus_endpoint::Registry;
use super::*;

/// Type alias for ipc server
Expand All @@ -81,7 +83,6 @@ mod inner {
addr: &std::net::SocketAddr,
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
metrics_registry: Option<&Registry>,
) -> io::Result<http::Server> {
http::ServerBuilder::new(io)
.threads(4)
Expand All @@ -94,7 +95,6 @@ mod inner {
})
.cors(map_cors::<http::AccessControlAllowOrigin>(cors))
.max_request_body_size(MAX_PAYLOAD)
.request_middleware(HttpRpcMiddleware::new(metrics_registry))
.start_http(addr)
}

Expand Down Expand Up @@ -123,14 +123,12 @@ mod inner {
max_connections: Option<usize>,
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
metrics_registry: Option<&Registry>,
) -> io::Result<ws::Server> {
ws::ServerBuilder::with_meta_extractor(io, |context: &ws::RequestContext| context.sender().into())
.max_payload(MAX_PAYLOAD)
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
.allowed_origins(map_cors(cors))
.allowed_hosts(hosts_filtering(cors.is_some()))
.request_middleware(WSRpcMiddleware::new(metrics_registry))
.start(addr)
.map_err(|err| match err {
ws::Error::Io(io) => io,
Expand Down
94 changes: 32 additions & 62 deletions client/rpc-servers/src/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// This file is part of Substrate.

// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd.
// Copyright (C) 2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
Expand All @@ -16,86 +16,56 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Middlewares for RPC servers.
//! Middleware for RPC requests.

use http::{RequestMiddleware as HttpRequestMiddleware, RequestMiddlewareAction as HttpRequestMiddlewareAction};
use hyper::Body;
use jsonrpc_core::{Middleware as RequestMiddleware, Metadata, Request, Response, FutureResponse, FutureOutput};
use prometheus_endpoint::{Registry, Counter, PrometheusError, register, U64};
use ws::{RequestMiddleware as WSRequestMiddleware, MiddlewareAction as WSRequestMiddlewareAction};

struct HTTPMetrics {
http_rpc_calls: Counter<U64>,
}

impl HTTPMetrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(HTTPMetrics {
http_rpc_calls: register(Counter::new(
"rpc_http_calls_total",
"Number of rpc calls received through http interface",
)?, r)?,
})
}
}

/// Middleware for RPC calls over HTTP
pub struct HttpRpcMiddleware {
metrics: Option<HTTPMetrics>,
}
use futures::{future::Either, Future};

impl HttpRpcMiddleware {
pub fn new(metrics_registry: Option<&Registry>) -> Self {
HttpRpcMiddleware {
metrics: metrics_registry.and_then(|r| HTTPMetrics::register(r).ok()),
}
}
#[derive(Debug)]
struct RpcMetrics {
rpc_calls: Counter<U64>,
}

impl HttpRequestMiddleware for HttpRpcMiddleware {
fn on_request(&self, request: hyper::Request<Body>) -> HttpRequestMiddlewareAction {
if let Some(ref metrics) = self.metrics {
metrics.http_rpc_calls.inc();
}
HttpRequestMiddlewareAction::Proceed {
should_continue_on_invalid_cors: false,
request,
}
}
}

struct WSMetrics {
ws_rpc_calls: Counter<U64>,
}

impl WSMetrics {
impl RpcMetrics {
fn register(r: &Registry) -> Result<Self, PrometheusError> {
Ok(WSMetrics {
ws_rpc_calls: register(Counter::new(
"rpc_ws_calls_total",
"Number of rpc calls received through web socket interface",
Ok(RpcMetrics {
rpc_calls: register(Counter::new(
"rpc_calls_total",
"Number of rpc calls received",
)?, r)?,
})
}
}

/// Middleware for RPC calls over web sockets
pub struct WSRpcMiddleware {
metrics: Option<WSMetrics>,
/// Middleware for RPC calls
pub struct RpcMiddleware {
metrics: Option<RpcMetrics>,
}

impl WSRpcMiddleware {
impl RpcMiddleware {
/// Create an instance of middleware
pub fn new(metrics_registry: Option<&Registry>) -> Self {
WSRpcMiddleware {
metrics: metrics_registry.and_then(|r| WSMetrics::register(r).ok()),
RpcMiddleware {
metrics: metrics_registry.and_then(|r| RpcMetrics::register(r).ok()),
}
}
}

impl WSRequestMiddleware for WSRpcMiddleware {
fn process(&self, _req: &ws_core::Request) -> WSRequestMiddlewareAction {
impl<M: Metadata> RequestMiddleware<M> for RpcMiddleware {
type Future = FutureResponse;
type CallFuture = FutureOutput;

fn on_request<F, X>(&self, request: Request, meta: M, next: F) -> Either<FutureResponse, X>
where
F: Fn(Request, M) -> X + Send + Sync,
X: Future<Item = Option<Response>, Error = ()> + Send + 'static,
{
if let Some(ref metrics) = self.metrics {
metrics.ws_rpc_calls.inc();
metrics.rpc_calls.inc();
}
WSRequestMiddlewareAction::Proceed

Either::B(next(request, meta))
}
}
}
23 changes: 13 additions & 10 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
rpc_extensions_builder: &(dyn RpcExtensionBuilder<Output = TRpc> + Send),
offchain_storage: Option<<TBackend as sc_client_api::backend::Backend<TBl>>::OffchainStorage>,
system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<TBl>>
) -> jsonrpc_pubsub::PubSubHandler<sc_rpc::Metadata>
) -> sc_rpc_server::RpcHandler<sc_rpc::Metadata>
where
TBl: BlockT,
TCl: ProvideRuntimeApi<TBl> + BlockchainEvents<TBl> + HeaderBackend<TBl> +
Expand Down Expand Up @@ -746,15 +746,18 @@ fn gen_handler<TBl, TBackend, TExPool, TRpc, TCl>(
offchain::OffchainApi::to_delegate(offchain)
});

sc_rpc_server::rpc_handler((
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, task_executor),
))
sc_rpc_server::rpc_handler(
(
state::StateApi::to_delegate(state),
state::ChildStateApi::to_delegate(child_state),
chain::ChainApi::to_delegate(chain),
maybe_offchain_rpc,
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions_builder.build(deny_unsafe, task_executor),
),
config.prometheus_registry()
)
}

/// Parameters to pass into `build_network`.
Expand Down
6 changes: 2 additions & 4 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl<T> MallocSizeOfWasm for T {}

/// RPC handlers that can perform RPC queries.
#[derive(Clone)]
pub struct RpcHandlers(Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>>);
pub struct RpcHandlers(Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata, sc_rpc_server::RpcMiddleware>>);

impl RpcHandlers {
/// Starts an RPC query.
Expand All @@ -118,7 +118,7 @@ impl RpcHandlers {
}

/// Provides access to the underlying `MetaIoHandler`
pub fn io_handler(&self) -> Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>> {
pub fn io_handler(&self) -> Arc<jsonrpc_core::MetaIoHandler<sc_rpc::Metadata, sc_rpc_server::RpcMiddleware>> {
self.0.clone()
}
}
Expand Down Expand Up @@ -421,7 +421,6 @@ fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<s
address,
config.rpc_cors.as_ref(),
gen_handler(deny_unsafe(&address, &config.rpc_methods)),
config.prometheus_registry(),
),
)?.map(|s| waiting::HttpServer(Some(s))),
maybe_start_server(
Expand All @@ -431,7 +430,6 @@ fn start_rpc_servers<H: FnMut(sc_rpc::DenyUnsafe) -> sc_rpc_server::RpcHandler<s
config.rpc_ws_max_connections,
config.rpc_cors.as_ref(),
gen_handler(deny_unsafe(&address, &config.rpc_methods)),
config.prometheus_registry(),
),
)?.map(|s| waiting::WsServer(Some(s))),
)))
Expand Down