Skip to content

Commit 807bfaf

Browse files
authored
cuprated: add txpool manager (Cuprate#483)
* add txpool manager * add version check for tx-pool * add docs * fix merge * fix test * fix merge * fix cargo hack * clean up imports * fix typo * add small buffer to rebroadcasts * review fixes * small clean up * fmt * fix merge * review fixes
1 parent 32b3572 commit 807bfaf

File tree

31 files changed

+804
-182
lines changed

31 files changed

+804
-182
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

binaries/cuprated/src/blockchain/manager.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::{
2828
types::ConsensusBlockchainReadHandle,
2929
},
3030
constants::PANIC_CRITICAL_SERVICE_ERROR,
31+
txpool::TxpoolManagerHandle,
3132
};
3233

3334
mod commands;
@@ -46,7 +47,7 @@ pub async fn init_blockchain_manager(
4647
clearnet_interface: NetworkInterface<ClearNet>,
4748
blockchain_write_handle: BlockchainWriteHandle,
4849
blockchain_read_handle: BlockchainReadHandle,
49-
txpool_write_handle: TxpoolWriteHandle,
50+
txpool_manager_handle: TxpoolManagerHandle,
5051
mut blockchain_context_service: BlockchainContextService,
5152
block_downloader_config: BlockDownloaderConfig,
5253
) {
@@ -72,7 +73,7 @@ pub async fn init_blockchain_manager(
7273
blockchain_read_handle,
7374
BoxError::from,
7475
),
75-
txpool_write_handle,
76+
txpool_manager_handle,
7677
blockchain_context_service,
7778
stop_current_block_downloader,
7879
broadcast_svc: clearnet_interface.broadcast_svc(),
@@ -93,8 +94,8 @@ pub struct BlockchainManager {
9394
blockchain_write_handle: BlockchainWriteHandle,
9495
/// A [`BlockchainReadHandle`].
9596
blockchain_read_handle: ConsensusBlockchainReadHandle,
96-
/// A [`TxpoolWriteHandle`].
97-
txpool_write_handle: TxpoolWriteHandle,
97+
98+
txpool_manager_handle: TxpoolManagerHandle,
9899
/// The blockchain context cache, this caches the current state of the blockchain to quickly calculate/retrieve
99100
/// values without needing to go to a [`BlockchainReadHandle`].
100101
blockchain_context_service: BlockchainContextService,

binaries/cuprated/src/blockchain/manager/handler.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -619,11 +619,8 @@ impl super::BlockchainManager {
619619
.await
620620
.expect(PANIC_CRITICAL_SERVICE_ERROR);
621621

622-
self.txpool_write_handle
623-
.ready()
624-
.await
625-
.expect(PANIC_CRITICAL_SERVICE_ERROR)
626-
.call(TxpoolWriteRequest::NewBlock { spent_key_images })
622+
self.txpool_manager_handle
623+
.new_block(spent_key_images)
627624
.await
628625
.expect(PANIC_CRITICAL_SERVICE_ERROR);
629626
}

binaries/cuprated/src/blockchain/manager/tests.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@ use cuprate_p2p::{block_downloader::BlockBatch, BroadcastSvc};
1414
use cuprate_p2p_core::handles::HandleBuilder;
1515
use cuprate_types::{CachedVerificationState, TransactionVerificationData, TxVersion};
1616

17-
use crate::blockchain::{
18-
check_add_genesis, manager::BlockchainManager, manager::BlockchainManagerCommand,
19-
ConsensusBlockchainReadHandle,
17+
use crate::{
18+
blockchain::{
19+
check_add_genesis, manager::BlockchainManager, manager::BlockchainManagerCommand,
20+
ConsensusBlockchainReadHandle,
21+
},
22+
txpool::TxpoolManagerHandle,
2023
};
2124

2225
async fn mock_manager(data_dir: PathBuf) -> BlockchainManager {
@@ -30,7 +33,7 @@ async fn mock_manager(data_dir: PathBuf) -> BlockchainManager {
3033
let (mut blockchain_read_handle, mut blockchain_write_handle, _) =
3134
cuprate_blockchain::service::init(blockchain_config).unwrap();
3235
let (txpool_read_handle, txpool_write_handle, _) =
33-
cuprate_txpool::service::init(txpool_config).unwrap();
36+
cuprate_txpool::service::init(&txpool_config).unwrap();
3437

3538
check_add_genesis(
3639
&mut blockchain_read_handle,
@@ -56,7 +59,7 @@ async fn mock_manager(data_dir: PathBuf) -> BlockchainManager {
5659
BlockchainManager {
5760
blockchain_write_handle,
5861
blockchain_read_handle,
59-
txpool_write_handle,
62+
txpool_manager_handle: TxpoolManagerHandle::mock(),
6063
blockchain_context_service,
6164
stop_current_block_downloader: Arc::new(Default::default()),
6265
broadcast_svc: BroadcastSvc::mock(),

binaries/cuprated/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use fs::FileSystemConfig;
4545
use p2p::P2PConfig;
4646
use rayon::RayonConfig;
4747
pub use rpc::RpcConfig;
48-
use storage::StorageConfig;
48+
pub use storage::{StorageConfig, TxpoolConfig};
4949
use tokio::TokioConfig;
5050
use tor::TorConfig;
5151
use tracing_config::TracingConfig;

binaries/cuprated/src/config/storage.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ config_struct! {
6262
pub struct BlockchainConfig { }
6363

6464
/// The tx-pool config.
65-
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
65+
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
6666
#[serde(deny_unknown_fields, default)]
6767
pub struct TxpoolConfig {
6868
/// The maximum size of the tx-pool.
@@ -71,6 +71,14 @@ config_struct! {
7171
/// Valid values | >= 0
7272
/// Examples | 100_000_000, 50_000_000
7373
pub max_txpool_byte_size: usize,
74+
75+
/// The maximum age of transactions in the pool in seconds.
76+
/// Transactions will be dropped after this time is reached.
77+
///
78+
/// Type | Number
79+
/// Valid values | >= 0
80+
/// Examples | 100_000_000, 50_000_000
81+
pub maximum_age_secs: u64,
7482
}
7583
}
7684

@@ -79,6 +87,7 @@ impl Default for TxpoolConfig {
7987
Self {
8088
sync_mode: SyncMode::default(),
8189
max_txpool_byte_size: 100_000_000,
90+
maximum_age_secs: 60 * 60 * 24,
8291
}
8392
}
8493
}

binaries/cuprated/src/main.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ fn main() {
9292
.expect(DATABASE_CORRUPT_MSG);
9393

9494
let (txpool_read_handle, txpool_write_handle, _) =
95-
cuprate_txpool::service::init_with_pool(config.txpool_config(), db_thread_pool)
95+
cuprate_txpool::service::init_with_pool(&config.txpool_config(), db_thread_pool)
9696
.inspect_err(|e| error!("Txpool database error: {e}"))
9797
.expect(DATABASE_CORRUPT_MSG);
9898

@@ -137,13 +137,15 @@ fn main() {
137137

138138
// Create the incoming tx handler service.
139139
let tx_handler = IncomingTxHandler::init(
140+
config.storage.txpool.clone(),
140141
network_interfaces.clearnet_network_interface.clone(),
141142
network_interfaces.tor_network_interface,
142143
txpool_write_handle.clone(),
143144
txpool_read_handle.clone(),
144145
context_svc.clone(),
145146
blockchain_read_handle.clone(),
146-
);
147+
)
148+
.await;
147149

148150
// Send tx handler sender to all network zones
149151
for zone in tx_handler_subscribers {
@@ -157,7 +159,7 @@ fn main() {
157159
network_interfaces.clearnet_network_interface,
158160
blockchain_write_handle,
159161
blockchain_read_handle.clone(),
160-
txpool_write_handle.clone(),
162+
tx_handler.txpool_manager.clone(),
161163
context_svc.clone(),
162164
config.block_downloader_config(),
163165
)

binaries/cuprated/src/p2p/request_handler.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use monero_serai::{block::Block, transaction::Transaction};
1414
use tokio::sync::{broadcast, oneshot, watch};
1515
use tokio_stream::wrappers::WatchStream;
1616
use tower::{Service, ServiceExt};
17+
use tracing::instrument;
1718

1819
use cuprate_blockchain::service::BlockchainReadHandle;
1920
use cuprate_consensus::{
@@ -22,10 +23,9 @@ use cuprate_consensus::{
2223
};
2324
use cuprate_dandelion_tower::TxState;
2425
use cuprate_fixed_bytes::ByteArrayVec;
25-
use cuprate_helper::cast::u64_to_usize;
2626
use cuprate_helper::{
2727
asynch::rayon_spawn_async,
28-
cast::usize_to_u64,
28+
cast::{u64_to_usize, usize_to_u64},
2929
map::{combine_low_high_bits_to_u128, split_u128_into_low_high_bits},
3030
};
3131
use cuprate_p2p::constants::{
@@ -363,6 +363,7 @@ async fn new_fluffy_block<A: NetZoneAddress>(
363363
}
364364

365365
/// [`ProtocolRequest::NewTransactions`]
366+
#[instrument(level = "debug", skip_all, fields(txs = request.txs.len(), stem = !request.dandelionpp_fluff))]
366367
async fn new_transactions<A>(
367368
peer_information: PeerInformation<A>,
368369
request: NewTransactions,
@@ -373,16 +374,22 @@ where
373374
A: NetZoneAddress,
374375
InternalPeerID<A>: Into<CrossNetworkInternalPeerId>,
375376
{
377+
tracing::debug!("handling new transactions");
378+
376379
let context = blockchain_context_service.blockchain_context();
377380

378381
// If we are more than 2 blocks behind the peer then ignore the txs - we are probably still syncing.
379-
if usize_to_u64(context.chain_height + 2)
380-
< peer_information
381-
.core_sync_data
382-
.lock()
383-
.unwrap()
384-
.current_height
385-
{
382+
let peer_height = peer_information
383+
.core_sync_data
384+
.lock()
385+
.unwrap()
386+
.current_height;
387+
if usize_to_u64(context.chain_height + 2) < peer_height {
388+
tracing::debug!(
389+
our_height = context.chain_height,
390+
peer_height,
391+
"we are too far behind peer, ignoring txs."
392+
);
386393
return Ok(ProtocolResponse::NA);
387394
}
388395

binaries/cuprated/src/rpc/handlers/json_rpc.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use cuprate_helper::{
2323
cast::{u32_to_usize, u64_to_usize, usize_to_u64},
2424
fmt::HexPrefix,
2525
map::split_u128_into_low_high_bits,
26+
time::current_unix_timestamp,
2627
};
2728
use cuprate_hex::{Hex, HexVec};
2829
use cuprate_p2p_core::{client::handshaker::builder::DummyAddressBook, ClearNet, Network};
@@ -923,13 +924,15 @@ async fn get_transaction_pool_backlog(
923924
mut state: CupratedRpcHandler,
924925
_: GetTransactionPoolBacklogRequest,
925926
) -> Result<GetTransactionPoolBacklogResponse, Error> {
927+
let now = current_unix_timestamp();
928+
926929
let backlog = txpool::backlog(&mut state.txpool_read)
927930
.await?
928931
.into_iter()
929932
.map(|entry| TxBacklogEntry {
930-
weight: entry.weight,
933+
weight: usize_to_u64(entry.weight),
931934
fee: entry.fee,
932-
time_in_pool: entry.time_in_pool.as_secs(),
935+
time_in_pool: now - entry.received_at,
933936
})
934937
.collect();
935938

@@ -968,7 +971,7 @@ async fn get_miner_data(
968971
.into_iter()
969972
.map(|entry| GetMinerDataTxBacklogEntry {
970973
id: Hex(entry.id),
971-
weight: entry.weight,
974+
weight: usize_to_u64(entry.weight),
972975
fee: entry.fee,
973976
})
974977
.collect();

binaries/cuprated/src/txpool.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ use cuprate_txpool::service::{TxpoolReadHandle, TxpoolWriteHandle};
88

99
mod dandelion;
1010
mod incoming_tx;
11+
mod manager;
1112
mod relay_rules;
1213
mod txs_being_handled;
1314

1415
pub use incoming_tx::{IncomingTxError, IncomingTxHandler, IncomingTxs};
16+
pub use manager::TxpoolManagerHandle;
1517
pub use relay_rules::RelayRuleError;

0 commit comments

Comments
 (0)