Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ const CONNECTION_CLOSE_REASON_DROPPED_ENTRY: &[u8] = b"dropped";
pub(crate) const CONNECTION_CLOSE_CODE_DISALLOWED: u32 = 2;
pub(crate) const CONNECTION_CLOSE_REASON_DISALLOWED: &[u8] = b"disallowed";

pub(crate) const CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT: u32 = 3;
pub(crate) const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] =
b"exceed_max_stream_count";

const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";

Expand Down Expand Up @@ -412,7 +408,6 @@ pub fn get_connection_stake(
#[derive(Debug)]
pub(crate) enum ConnectionHandlerError {
ConnectionAddError,
MaxStreamError,
}

pub(crate) fn update_open_connections_stat<S: OpaqueStreamerCounter>(
Expand Down
132 changes: 73 additions & 59 deletions streamer/src/nonblocking/swqos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use {
get_connection_stake, update_open_connections_stat, ClientConnectionTracker,
ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey,
ConnectionTableType, CONNECTION_CLOSE_CODE_DISALLOWED,
CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT, CONNECTION_CLOSE_REASON_DISALLOWED,
CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT,
CONNECTION_CLOSE_REASON_DISALLOWED,
},
stream_throttle::{
throttle_stream, ConnectionStreamCounter, StakedStreamLoadEMA,
Expand Down Expand Up @@ -48,6 +47,13 @@ pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128;
pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 512;

pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000;
/// Below this RTT, we apply the legacy logic (no BDP scaling)
/// Above this RTT, we increase the RX window and number of streams
/// as RTT increases to preserve reasonable bandwidth.
const REFERENCE_RTT_MS: u64 = 50;

/// Above this RTT we stop scaling for BDP
const MAX_RTT_MS: u64 = 350;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this number come from? great that we have a cap here! is it possible to spoof your rtt when you initially setup your connection to get more streams? And then you get more access to the pipe? For example, if your RTT is 20ms but you spoof your connection to be 350ms, how much more TPS are you going to be able to get?

maybe this is not a big issue but just want to make sure.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

320 ms is an RTT from Australia to Portugal, which is the largest RTT I could find on this planet. 350 is that + a bit of margin.

You can of course spoof RTT and get more concurrent streams, but you would get throttled by SWQOS to whatever your proper quota is anyway. So faking RTT is not going to bring you real TPS increases beyond what you are supposed to get based on your stake.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it! thanks for the explanation!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With "no throttling below threshold" in place there will be no throttling in SWQOS unless the overall load is high though. It should be possible to address RTT spoofing, if really needed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone will be trying to abuse this, all they will achieve is force system back into SWQOS mode, which they can do without BDP scaling easily enough (as we did during tests of #9580)


#[derive(Clone)]
pub struct SwQosConfig {
Expand Down Expand Up @@ -140,8 +146,12 @@ impl SwQos {
}
}

fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
match peer_type {
fn compute_max_allowed_uni_streams(
rtt_millis: u64,
peer_type: ConnectionPeerType,
total_stake: u64,
) -> u32 {
let streams = match peer_type {
ConnectionPeerType::Staked(peer_stake) => {
// No checked math for f64 type. So let's explicitly check for 0 here
if total_stake == 0 || peer_stake > total_stake {
Expand All @@ -164,7 +174,10 @@ fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u
}
}
ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
}
};
let streams =
streams as u64 * rtt_millis.clamp(REFERENCE_RTT_MS, MAX_RTT_MS) / REFERENCE_RTT_MS;
streams.min(u32::MAX as u64) as u32
}

impl SwQos {
Expand All @@ -182,58 +195,51 @@ impl SwQos {
),
ConnectionHandlerError,
> {
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
// get current RTT and limit it to MAX_RTT_MS
let rtt_millis = connection.rtt().as_millis() as u64;
let max_uni_streams = VarInt::from_u32(compute_max_allowed_uni_streams(
rtt_millis,
conn_context.peer_type(),
conn_context.total_stake,
) as u64)
{
let remote_addr = connection.remote_address();
));

debug!(
"Peer type {:?}, total stake {}, max streams {} from peer {}",
let remote_addr = connection.remote_address();

debug!(
"Peer type {:?}, total stake {}, max streams {} from peer {}",
conn_context.peer_type(),
conn_context.total_stake,
max_uni_streams.into_inner(),
remote_addr,
);

let max_connections_per_peer = match conn_context.peer_type() {
ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer,
ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer,
};
if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l
.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey),
remote_addr.port(),
client_connection_tracker,
Some(connection.clone()),
conn_context.peer_type(),
conn_context.total_stake,
max_uni_streams.into_inner(),
remote_addr,
);
conn_context.last_update.clone(),
max_connections_per_peer,
|| Arc::new(ConnectionStreamCounter::new()),
)
{
update_open_connections_stat(&self.stats, &connection_table_l);
drop(connection_table_l);

let max_connections_per_peer = match conn_context.peer_type() {
ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer,
ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer,
};
if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l
.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey),
remote_addr.port(),
client_connection_tracker,
Some(connection.clone()),
conn_context.peer_type(),
conn_context.last_update.clone(),
max_connections_per_peer,
|| Arc::new(ConnectionStreamCounter::new()),
)
{
update_open_connections_stat(&self.stats, &connection_table_l);
drop(connection_table_l);

connection.set_max_concurrent_uni_streams(max_uni_streams);

Ok((last_update, cancel_connection, stream_counter))
} else {
self.stats
.connection_add_failed
.fetch_add(1, Ordering::Relaxed);
Err(ConnectionHandlerError::ConnectionAddError)
}
connection.set_max_concurrent_uni_streams(max_uni_streams);

Ok((last_update, cancel_connection, stream_counter))
} else {
connection.close(
CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT.into(),
CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT,
);
self.stats
.connection_add_failed_invalid_stream_count
.connection_add_failed
.fetch_add(1, Ordering::Relaxed);
Err(ConnectionHandlerError::MaxStreamError)
Err(ConnectionHandlerError::ConnectionAddError)
}
}

Expand Down Expand Up @@ -528,27 +534,35 @@ pub mod test {
#[test]
fn test_max_allowed_uni_streams() {
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Unstaked, 0),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u32
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Staked(10), 0),
QUIC_MIN_STAKED_CONCURRENT_STREAMS as u32
);
let delta =
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000),
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
compute_max_allowed_uni_streams(
REFERENCE_RTT_MS,
ConnectionPeerType::Staked(1000),
10000
),
QUIC_MAX_STAKED_CONCURRENT_STREAMS as u32,
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000),
compute_max_allowed_uni_streams(
REFERENCE_RTT_MS,
ConnectionPeerType::Staked(100),
10000
),
((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS)
.min(QUIC_MAX_STAKED_CONCURRENT_STREAMS)
.min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) as u32
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Unstaked, 10000),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u32
);
}
}
Loading