Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ members = [
"tokens",
"tpu-client",
"transaction-dos",
"transaction-metrics-tracker",
"transaction-status",
"turbine",
"udp-client",
Expand Down Expand Up @@ -365,6 +366,7 @@ solana-test-validator = { path = "test-validator", version = "=1.17.30" }
solana-thin-client = { path = "thin-client", version = "=1.17.30" }
solana-tpu-client = { path = "tpu-client", version = "=1.17.30", default-features = false }
solana-transaction-status = { path = "transaction-status", version = "=1.17.30" }
solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=1.17.30" }
solana-turbine = { path = "turbine", version = "=1.17.30" }
solana-udp-client = { path = "udp-client", version = "=1.17.30" }
solana-version = { path = "version", version = "=1.17.30" }
Expand Down
16 changes: 16 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ bitflags! {
/// the packet is built.
/// This field can be removed when the above feature gate is adopted by mainnet-beta.
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
/// For tracking performance
const PERF_TRACK_PACKET = 0b0100_0000;
}
}

Expand Down Expand Up @@ -228,6 +230,12 @@ impl Meta {
self.flags.set(PacketFlags::TRACER_PACKET, is_tracer);
}

#[inline]
pub fn set_track_performance(&mut self, is_performance_track: bool) {
self.flags
.set(PacketFlags::PERF_TRACK_PACKET, is_performance_track);
}

#[inline]
pub fn set_simple_vote(&mut self, is_simple_vote: bool) {
self.flags.set(PacketFlags::SIMPLE_VOTE_TX, is_simple_vote);
Expand Down Expand Up @@ -261,6 +269,11 @@ impl Meta {
self.flags.contains(PacketFlags::TRACER_PACKET)
}

#[inline]
pub fn is_perf_track_packet(&self) -> bool {
self.flags.contains(PacketFlags::PERF_TRACK_PACKET)
}

#[inline]
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
Expand Down
2 changes: 2 additions & 0 deletions streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ quinn-proto = { workspace = true }
rand = { workspace = true }
rcgen = { workspace = true }
rustls = { workspace = true, features = ["dangerous_configuration"] }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
x509-parser = { workspace = true }
Expand Down
44 changes: 43 additions & 1 deletion streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
quinn_proto::VarIntBoundsExceeded,
rand::{thread_rng, Rng},
solana_measure::measure::Measure,
solana_perf::packet::{PacketBatch, PACKETS_PER_BATCH},
solana_sdk::{
packet::{Meta, PACKET_DATA_SIZE},
Expand All @@ -24,9 +25,10 @@ use {
QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO,
QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
},
signature::Keypair,
signature::{Keypair, Signature},
timing,
},
solana_transaction_metrics_tracker::signature_if_should_track_packet,
std::{
iter::repeat_with,
net::{IpAddr, SocketAddr, UdpSocket},
Expand Down Expand Up @@ -96,6 +98,7 @@ struct PacketChunk {
struct PacketAccumulator {
pub meta: Meta,
pub chunks: Vec<PacketChunk>,
pub start_time: Instant,
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -631,6 +634,7 @@ async fn packet_batch_sender(
trace!("enter packet_batch_sender");
let mut batch_start_time = Instant::now();
loop {
let mut packet_perf_measure: Vec<([u8; 64], std::time::Instant)> = Vec::default();
let mut packet_batch = PacketBatch::with_capacity(PACKETS_PER_BATCH);
let mut total_bytes: usize = 0;

Expand All @@ -650,6 +654,8 @@ async fn packet_batch_sender(
|| (!packet_batch.is_empty() && elapsed >= coalesce)
{
let len = packet_batch.len();
track_streamer_fetch_packet_performance(&mut packet_perf_measure, &stats);

if let Err(e) = packet_sender.send(packet_batch) {
stats
.total_packet_batch_send_err
Expand Down Expand Up @@ -695,6 +701,14 @@ async fn packet_batch_sender(

total_bytes += packet_batch[i].meta().size;

if let Some(signature) = signature_if_should_track_packet(&packet_batch[i])
.ok()
.flatten()
{
packet_perf_measure.push((*signature, packet_accumulator.start_time));
// we set the PERF_TRACK_PACKET on
packet_batch[i].meta_mut().set_track_performance(true);
}
stats
.total_chunks_processed_by_batcher
.fetch_add(num_chunks, Ordering::Relaxed);
Expand Down Expand Up @@ -732,6 +746,32 @@ fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) ->
}
}

fn track_streamer_fetch_packet_performance(
packet_perf_measure: &mut [([u8; 64], Instant)],
stats: &Arc<StreamStats>,
) {
if packet_perf_measure.is_empty() {
return;
}
let mut measure = Measure::start("track_perf");
let mut process_sampled_packets_us_hist = stats.process_sampled_packets_us_hist.lock().unwrap();

for (signature, start_time) in packet_perf_measure.iter() {
let duration = Instant::now().duration_since(*start_time);
debug!(
"QUIC streamer fetch stage took {duration:?} for transaction {:?}",
Signature::from(*signature)
);
process_sampled_packets_us_hist
.increment(duration.as_micros() as u64)
.unwrap();
}
measure.stop();
stats
.perf_track_overhead_us
.fetch_add(measure.as_us(), Ordering::Relaxed);
}

async fn handle_connection(
connection: Connection,
remote_addr: SocketAddr,
Expand Down Expand Up @@ -879,6 +919,7 @@ async fn handle_chunk(
*packet_accum = Some(PacketAccumulator {
meta,
chunks: Vec::new(),
start_time: Instant::now(),
});
}

Expand Down Expand Up @@ -1468,6 +1509,7 @@ pub mod test {
offset,
end_of_chunk: size,
}],
start_time: Instant::now(),
};
ptk_sender.send(packet_accum).await.unwrap();
}
Expand Down
45 changes: 43 additions & 2 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use {
std::{
net::{IpAddr, UdpSocket},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, RwLock,
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
},
thread,
time::{Duration, SystemTime},
Expand Down Expand Up @@ -157,10 +157,19 @@ pub struct StreamStats {
pub(crate) connection_removed: AtomicUsize,
pub(crate) connection_remove_failed: AtomicUsize,
pub(crate) throttled_streams: AtomicUsize,
pub(crate) process_sampled_packets_us_hist: Mutex<histogram::Histogram>,
pub(crate) perf_track_overhead_us: AtomicU64,
}

impl StreamStats {
pub fn report(&self, name: &'static str) {
let process_sampled_packets_us_hist = {
let mut metrics = self.process_sampled_packets_us_hist.lock().unwrap();
let process_sampled_packets_us_hist = metrics.clone();
metrics.clear();
process_sampled_packets_us_hist
};

datapoint_info!(
name,
(
Expand Down Expand Up @@ -392,6 +401,38 @@ impl StreamStats {
self.throttled_streams.swap(0, Ordering::Relaxed),
i64
),
(
"process_sampled_packets_us_90pct",
process_sampled_packets_us_hist
.percentile(90.0)
.unwrap_or(0),
i64
),
(
"process_sampled_packets_us_min",
process_sampled_packets_us_hist.minimum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_max",
process_sampled_packets_us_hist.maximum().unwrap_or(0),
i64
),
(
"process_sampled_packets_us_mean",
process_sampled_packets_us_hist.mean().unwrap_or(0),
i64
),
(
"process_sampled_packets_count",
process_sampled_packets_us_hist.entries(),
i64
),
(
"perf_track_overhead_us",
self.perf_track_overhead_us.swap(0, Ordering::Relaxed),
i64
),
);
}
}
Expand Down
25 changes: 25 additions & 0 deletions transaction-metrics-tracker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "solana-transaction-metrics-tracker"
description = "Solana transaction metrics tracker"
documentation = "https://docs.rs/solana-transaction-metrics-tracker"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }
publish = false

[dependencies]
Inflector = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
# Update this borsh dependency to the workspace version once
lazy_static = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
Loading