@@ -30,13 +30,26 @@ use {
3030 std:: {
3131 iter:: repeat_with,
3232 net:: { IpAddr , SocketAddr , UdpSocket } ,
33+ // CAUTION: be careful not to introduce any awaits while holding an RwLock.
3334 sync:: {
3435 atomic:: { AtomicBool , AtomicU64 , Ordering } ,
35- Arc , Mutex , MutexGuard , RwLock ,
36+ Arc , RwLock ,
3637 } ,
3738 time:: { Duration , Instant } ,
3839 } ,
39- tokio:: { task:: JoinHandle , time:: timeout} ,
40+ tokio:: {
41+ // CAUTION: It's kind of sketch that we're mixing async and sync locks (see the RwLock above).
42+ // This is done so that sync code can also access the stake table.
43+ // Make sure we don't hold a sync lock across an await - including the await to
44+ // lock an async Mutex. This does not happen now and should not happen as long as we
45+ // don't hold an async Mutex and sync RwLock at the same time (currently true)
46+ // but if we do, the scope of the RwLock must always be a subset of the async Mutex
47+ // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to
48+ // introduce any other awaits while holding the RwLock.
49+ sync:: { Mutex , MutexGuard } ,
50+ task:: JoinHandle ,
51+ time:: timeout,
52+ } ,
4053} ;
4154
4255/// Limit to 500K PPS
@@ -383,7 +396,7 @@ fn handle_and_cache_new_connection(
383396 }
384397}
385398
386- fn prune_unstaked_connections_and_add_new_connection (
399+ async fn prune_unstaked_connections_and_add_new_connection (
387400 connection : Connection ,
388401 connection_table : Arc < Mutex < ConnectionTable > > ,
389402 max_connections : usize ,
@@ -393,7 +406,7 @@ fn prune_unstaked_connections_and_add_new_connection(
393406 let stats = params. stats . clone ( ) ;
394407 if max_connections > 0 {
395408 let connection_table_clone = connection_table. clone ( ) ;
396- let mut connection_table = connection_table. lock ( ) . unwrap ( ) ;
409+ let mut connection_table = connection_table. lock ( ) . await ;
397410 prune_unstaked_connection_table ( & mut connection_table, max_connections, stats) ;
398411 handle_and_cache_new_connection (
399412 connection,
@@ -496,7 +509,8 @@ async fn setup_connection(
496509 ) ;
497510
498511 if params. stake > 0 {
499- let mut connection_table_l = staked_connection_table. lock ( ) . unwrap ( ) ;
512+ let mut connection_table_l = staked_connection_table. lock ( ) . await ;
513+
500514 if connection_table_l. total_size >= max_staked_connections {
501515 let num_pruned =
502516 connection_table_l. prune_random ( PRUNE_RANDOM_SAMPLE_SIZE , params. stake ) ;
@@ -525,7 +539,9 @@ async fn setup_connection(
525539 max_unstaked_connections,
526540 & params,
527541 wait_for_chunk_timeout,
528- ) {
542+ )
543+ . await
544+ {
529545 stats
530546 . connection_added_from_staked_peer
531547 . fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -544,7 +560,9 @@ async fn setup_connection(
544560 max_unstaked_connections,
545561 & params,
546562 wait_for_chunk_timeout,
547- ) {
563+ )
564+ . await
565+ {
548566 stats
549567 . connection_added_from_unstaked_peer
550568 . fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -807,7 +825,7 @@ async fn handle_connection(
807825 }
808826 }
809827
810- let removed_connection_count = connection_table. lock ( ) . unwrap ( ) . remove_connection (
828+ let removed_connection_count = connection_table. lock ( ) . await . remove_connection (
811829 ConnectionTableKey :: new ( remote_addr. ip ( ) , params. remote_pubkey ) ,
812830 remote_addr. port ( ) ,
813831 stable_id,
0 commit comments