Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8c9d435
jsonrpsee v0.16
niklasad1 Nov 9, 2022
7c01e06
breaking: remove old CLI configs
niklasad1 Nov 9, 2022
08a957d
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
niklasad1 Jan 3, 2023
2078002
remove patch.crates-io
niklasad1 Jan 3, 2023
bff77f4
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
niklasad1 Feb 14, 2023
17e90a7
fix bad merge
niklasad1 Feb 14, 2023
10d60cc
fix clippy
niklasad1 Feb 14, 2023
1912055
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
niklasad1 Mar 20, 2023
fb0ac35
fix bad merge
niklasad1 Mar 20, 2023
160e492
fix grumbles
niklasad1 Mar 30, 2023
e3c4d29
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
niklasad1 Mar 30, 2023
ecace8e
Update client/service/src/lib.rs
niklasad1 Mar 30, 2023
0d6760a
revert block_in_place
niklasad1 Mar 30, 2023
226a8ea
add issue link in todo
niklasad1 Mar 31, 2023
ecdedf3
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
niklasad1 Mar 31, 2023
064e52e
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
niklasad1 Apr 12, 2023
c6934d1
Update client/cli/src/config.rs
niklasad1 Apr 12, 2023
3a0b1ec
grumbles: add ipv6 loopback address
niklasad1 Apr 12, 2023
7856c92
Revert "grumbles: add ipv6 loopback address"
niklasad1 Apr 12, 2023
be0485c
remove nits
niklasad1 Apr 12, 2023
e0f1880
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
niklasad1 Apr 26, 2023
964f146
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
niklasad1 May 2, 2023
cdc97de
bump zombienet version
pepoviola May 2, 2023
c2a9a0d
adress grumbles: provide structopt default_val_t
niklasad1 May 2, 2023
b7347e1
Merge remote-tracking branch 'origin/na-rpc-cli-break-everything' int…
niklasad1 May 2, 2023
cee7789
remove duplicate from structopt
niklasad1 May 2, 2023
1d0a3b9
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
niklasad1 May 2, 2023
5b62603
bump zombienet v1.3.47
niklasad1 May 3, 2023
9f1e259
bump zombienet version
pepoviola May 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/master' into na-rpc-cli-break-ev…
…erything
  • Loading branch information
niklasad1 committed Jan 3, 2023
commit 08a957d21715d2faaf95fc117d0a134e60709d03
701 changes: 121 additions & 580 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion bin/node/cli/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ pub fn find_ws_url_from_output(read: impl Read + Send) -> (String, String) {

Some(format!("ws://{}", sock_addr))
})
.expect("We should get an address");
.unwrap_or_else(|| {
eprintln!("Observed node output:\n{}", data);
panic!("We should get a WebSocket address")
});

(ws_url, data)
}
4 changes: 2 additions & 2 deletions client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@ log = "0.4.17"
serde_json = "1.0.85"
tokio = { version = "1.22.0", features = ["parking_lot"] }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" }
tower-http = { version = "0.3.4", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
tower-http = { version = "0.3.4", features = ["cors"] }
tower = "0.4.13"
http = "0.2.8"
55 changes: 13 additions & 42 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,38 @@

#![warn(missing_docs)]

pub mod middleware;

use jsonrpsee::{
server::{
middleware::proxy_get_request::ProxyGetRequestLayer, AllowHosts, ServerBuilder,
ServerHandle,
},
RpcModule,
};
use http::header::HeaderValue;
use tower_http::cors::{AllowOrigin, CorsLayer};
use std::{error::Error as StdError, net::SocketAddr};

pub use crate::middleware::RpcMetrics;
use http::header::HeaderValue;
pub use jsonrpsee::core::{
id_providers::{RandomIntegerIdProvider, RandomStringIdProvider},
traits::IdProvider,
};
use tower_http::cors::{AllowOrigin, CorsLayer};

const MEGABYTE: u32 = 1024 * 1024;

/// Maximal payload accepted by JSON-RPC servers.
pub const RPC_MAX_PAYLOAD_DEFAULT: u32 = 15 * MEGABYTE;

/// Default maximum number of connections for JSON-RPC servers.
const RPC_MAX_CONNECTIONS: u32 = 100;

/// Default maximum number subscriptions per connection for JSON-RPC servers.
const RPC_MAX_SUBS_PER_CONN: u32 = 1024;

pub mod middleware;
const MEGABYTE: u32 = 1024 * 1024;

/// Type alias for the JSON-RPC server.
pub type Server = ServerHandle;

/// Server config.
/// Maximal payload accepted by JSON-RPC servers.
pub const RPC_MAX_PAYLOAD_DEFAULT: u32 = 15 * MEGABYTE;

/// RPC server configuration.
#[derive(Debug)]
pub struct Config<'a, M: Send + Sync + 'static> {
/// Socket addresses.
Expand All @@ -78,7 +76,7 @@ pub struct Config<'a, M: Send + Sync + 'static> {
pub tokio_handle: tokio::runtime::Handle,
}

/// Start WS server listening on given address.
/// Start RPC server listening on given address.
pub async fn start_server<'a, M: Send + Sync + 'static>(
config: Config<'a, M>,
) -> Result<ServerHandle, Box<dyn StdError + Send + Sync>> {
Expand All @@ -97,22 +95,10 @@ pub async fn start_server<'a, M: Send + Sync + 'static>(

let host_filter = hosts_filtering(cors.is_some(), &addrs);

let cors = {
if let Some(cors) = cors {
let mut list = Vec::new();
for origin in cors {
list.push(HeaderValue::from_str(origin.as_str())?);
}
CorsLayer::new().allow_origin(AllowOrigin::list(list))
} else {
CorsLayer::permissive()
}
};

let middleware = tower::ServiceBuilder::new()
// Proxy `GET /health` requests to internal `system_health` method.
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
.layer(cors.clone());
.layer(try_into_cors(cors)?);

let mut builder = ServerBuilder::new()
.max_request_body_size(payload_size_or_default(max_payload_in_mb))
Expand All @@ -134,8 +120,7 @@ pub async fn start_server<'a, M: Send + Sync + 'static>(

let rpc_api = build_rpc_api(rpc_api);
let (handle, addr) = if let Some(metrics) = metrics {
let builder = builder.set_logger(metrics);
let server = builder.build(&addrs[..]).await?;
let server = builder.set_logger(metrics).build(&addrs[..]).await?;
let addr = server.local_addr();
(server.start(rpc_api)?, addr)
} else {
Expand All @@ -145,7 +130,7 @@ pub async fn start_server<'a, M: Send + Sync + 'static>(
};

log::info!(
"Running JSON-RPC server: addr={}, cors={:?}",
"Running JSON-RPC server: addr={}, allowed origins={}",
addr.map_or_else(|_| "unknown".to_string(), |a| a.to_string()),
format_cors(cors)
);
Expand Down Expand Up @@ -186,20 +171,6 @@ fn payload_size_or_default(size_mb: Option<usize>) -> u32 {
size_mb.map_or(RPC_MAX_PAYLOAD_DEFAULT, |mb| (mb as u32).saturating_mul(MEGABYTE))
}

fn hosts_filter(enabled: bool, addrs: &[SocketAddr]) -> AllowHosts {
if enabled {
// NOTE The listening addresses are whitelisted by default.
let mut hosts = Vec::with_capacity(addrs.len() * 2);
for addr in addrs {
hosts.push(format!("localhost:{}", addr.port()).into());
hosts.push(format!("127.0.0.1:{}", addr.port()).into());
}
AllowHosts::Only(hosts)
} else {
AllowHosts::Any
}
}

fn try_into_cors(
maybe_cors: Option<&Vec<String>>,
) -> Result<CorsLayer, Box<dyn StdError + Send + Sync>> {
Expand Down
3 changes: 2 additions & 1 deletion client/rpc-servers/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,10 @@ impl Logger for RpcMetrics {
.inc();
}

fn on_response(&self, _result: &str, started_at: Self::Instant, transport: TransportProtocol) {
fn on_response(&self, result: &str, started_at: Self::Instant, transport: TransportProtocol) {
let transport_label = transport_label_str(transport);
log::trace!(target: "rpc_metrics", "[{}] on_response started_at={:?}", transport_label, started_at);
log::trace!(target: "rpc_metrics::extra", "[{}] result={:?}", transport_label, result);
self.requests_finished.with_label_values(&[transport_label]).inc();
}

Expand Down
3 changes: 0 additions & 3 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,6 @@ where
}
}

let (max_request_size, ws_max_response_size, http_max_response_size) =
legacy_cli_parsing(config);

let random_port = |mut addr: SocketAddr| {
addr.set_port(0);
addr
Expand Down
8 changes: 3 additions & 5 deletions client/tracing/src/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,15 @@ where
.collect();
tracing::debug!(target: "state_tracing", "Captured {} spans and {} events", spans.len(), events.len());

let response = TraceBlockResponse::BlockTrace(BlockTrace {
block_hash: block_id_as_string(id),
Ok(TraceBlockResponse::BlockTrace(BlockTrace {
block_hash: block_id_as_string(BlockId::<Block>::Hash(self.block)),
parent_hash: block_id_as_string(parent_id),
tracing_targets: targets.to_string(),
storage_keys: self.storage_keys.clone().unwrap_or_default(),
methods: self.methods.clone().unwrap_or_default(),
spans,
events,
});

Ok(response)
}))
}
}

Expand Down
163 changes: 107 additions & 56 deletions utils/frame/remote-externalities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ use std::{
use substrate_rpc_client::{
rpc_params, ws_client, BatchRequestBuilder, ChainApi, ClientT, StateApi, WsClient,
};
use substrate_rpc_client::{
rpc_params, ws_client, BatchRequestBuilder, ChainApi, ClientT, StateApi, WsClient,
};

type KeyValue = (StorageKey, StorageData);
type TopKeyValues = Vec<KeyValue>;
Expand Down Expand Up @@ -402,60 +399,110 @@ where
return Ok(Default::default())
}

let mut key_values: Vec<KeyValue> = vec![];
let mut batch_success = true;
let client = self.as_online().rpc_client_cloned();
let threads = Self::threads().get();
let thread_chunk_size = (keys.len() + threads - 1) / threads;

let client = self.as_online().rpc_client();
for chunk_keys in keys.chunks(BATCH_SIZE) {
let mut batch = BatchRequestBuilder::new();
log::info!(
target: LOG_TARGET,
"Querying a total of {} keys from prefix {:?}, splitting among {} threads, {} keys per thread",
keys.len(),
HexDisplay::from(&prefix),
threads,
thread_chunk_size,
);

for key in chunk_keys.iter() {
batch
.insert("state_getStorage", rpc_params![key, at])
.map_err(|_| "Invalid batch params")?;
}
let mut handles = Vec::new();
let keys_chunked: Vec<Vec<StorageKey>> =
keys.chunks(thread_chunk_size).map(|s| s.into()).collect::<Vec<_>>();

let batch_response =
client.batch_request::<Option<StorageData>>(batch).await.map_err(|e| {
log::error!(
target: LOG_TARGET,
"failed to execute batch: {:?}. Error: {:?}",
chunk_keys.iter().map(HexDisplay::from).collect::<Vec<_>>(),
e
);
"batch failed."
})?;
enum Message {
/// This thread completed the assigned work.
Terminated,
/// The thread produced the following batch response.
Batch(Vec<(Vec<u8>, Vec<u8>)>),
/// A request from the batch failed.
BatchFailed(String),
}

assert_eq!(chunk_keys.len(), batch_response.len());
let (tx, mut rx) = mpsc::unbounded::<Message>();

for (key, maybe_value) in chunk_keys.into_iter().zip(batch_response) {
match maybe_value {
Ok(Some(v)) => {
key_values.push((key.clone(), v));
},
Ok(None) => {
log::warn!(
target: LOG_TARGET,
"key {:?} had none corresponding value.",
&key
);
key_values.push((key.clone(), StorageData(vec![])));
},
Err(e) => {
log::error!(target: LOG_TARGET, "key {:?} failed: {:?}", &key, e);
batch_success = false;
},
};
for thread_keys in keys_chunked {
let thread_client = client.clone();
let thread_sender = tx.clone();
let handle = std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
let mut thread_key_values = Vec::with_capacity(thread_keys.len());

if key_values.len() % (10 * BATCH_SIZE) == 0 {
let ratio: f64 = key_values.len() as f64 / keys_count as f64;
log::debug!(
target: LOG_TARGET,
"progress = {:.2} [{} / {}]",
ratio,
key_values.len(),
keys_count,
);
for chunk_keys in thread_keys.chunks(DEFAULT_VALUE_DOWNLOAD_BATCH) {
let mut batch = BatchRequestBuilder::new();

for key in chunk_keys.iter() {
batch
.insert("state_getStorage", rpc_params![key, at])
.map_err(|_| "Invalid batch params")
.unwrap();
}

let batch_response = rt
.block_on(thread_client.batch_request::<Option<StorageData>>(batch))
.map_err(|e| {
log::error!(
target: LOG_TARGET,
"failed to execute batch: {:?}. Error: {:?}",
chunk_keys.iter().map(HexDisplay::from).collect::<Vec<_>>(),
e
);
"batch failed."
})
.unwrap();

// Check if we got responses for all submitted requests.
assert_eq!(chunk_keys.len(), batch_response.len());

let mut batch_kv = Vec::with_capacity(chunk_keys.len());
for (key, maybe_value) in chunk_keys.into_iter().zip(batch_response) {
match maybe_value {
Ok(Some(data)) => {
thread_key_values.push((key.clone(), data.clone()));
batch_kv.push((key.clone().0, data.0));
},
Ok(None) => {
log::warn!(
target: LOG_TARGET,
"key {:?} had none corresponding value.",
&key
);
let data = StorageData(vec![]);
thread_key_values.push((key.clone(), data.clone()));
batch_kv.push((key.clone().0, data.0));
},
Err(e) => {
let reason = format!("key {:?} failed: {:?}", &key, e);
log::error!(target: LOG_TARGET, "Reason: {}", reason);
// Signal failures to the main thread, stop aggregating (key, value)
// pairs and return immediately an error.
thread_sender.unbounded_send(Message::BatchFailed(reason)).unwrap();
return Default::default()
},
};

if thread_key_values.len() % (thread_keys.len() / 10).max(1) == 0 {
let ratio: f64 =
thread_key_values.len() as f64 / thread_keys.len() as f64;
log::debug!(
target: LOG_TARGET,
"[thread = {:?}] progress = {:.2} [{} / {}]",
std::thread::current().id(),
ratio,
thread_key_values.len(),
thread_keys.len(),
);
}
}

// Send this batch to the main thread to start inserting.
thread_sender.unbounded_send(Message::Batch(batch_kv)).unwrap();
}

thread_sender.unbounded_send(Message::Terminated).unwrap();
Expand Down Expand Up @@ -494,11 +541,15 @@ where
}
}

if batch_success {
Ok(key_values)
} else {
Err("batch failed.")
// Ensure all threads finished execution before returning.
let keys_and_values =
handles.into_iter().flat_map(|h| h.join().unwrap()).collect::<Vec<_>>();

if batch_failed {
return Err("Batch failed.")
}

Ok(keys_and_values)
}

/// Get the values corresponding to `child_keys` at the given `prefixed_top_key`.
Expand All @@ -511,7 +562,7 @@ where
let mut child_kv_inner = vec![];
let mut batch_success = true;

for batch_child_key in child_keys.chunks(BATCH_SIZE) {
for batch_child_key in child_keys.chunks(DEFAULT_VALUE_DOWNLOAD_BATCH) {
let mut batch_request = BatchRequestBuilder::new();

for key in batch_child_key {
Expand Down
1 change: 1 addition & 0 deletions utils/frame/rpc/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub use jsonrpsee::{
core::{
client::{ClientT, Subscription, SubscriptionClientT},
params::BatchRequestBuilder,
Error, RpcResult,
},
rpc_params,
ws_client::{WsClient, WsClientBuilder},
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.