Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 113 additions & 84 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@ use lightning::onion_message::messenger::AOnionMessenger;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
use lightning::routing::utxo::UtxoLookup;
use lightning::sign::ChangeDestinationSource;
#[cfg(feature = "std")]
use lightning::sign::ChangeDestinationSourceSync;
use lightning::sign::EntropySource;
use lightning::sign::OutputSpender;
use lightning::sign::{
ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
};
use lightning::util::logger::Logger;
use lightning::util::persist::{
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
Expand All @@ -61,9 +59,7 @@ use lightning::util::persist::{
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::sweep::OutputSweeper;
#[cfg(feature = "std")]
use lightning::util::sweep::OutputSweeperSync;
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
#[cfg(feature = "std")]
use lightning::util::wakers::Sleeper;
use lightning_rapid_gossip_sync::RapidGossipSync;
Expand Down Expand Up @@ -304,7 +300,7 @@ where

/// Updates scorer based on event and returns whether an update occurred so we can decide whether
/// to persist.
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
fn update_scorer<'a, S: Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
scorer: &'a S, event: &Event, duration_since_epoch: Duration,
) -> bool {
match event {
Expand Down Expand Up @@ -866,31 +862,30 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
///```
pub async fn process_events_async<
'a,
UL: 'static + Deref,
CF: 'static + Deref,
T: 'static + Deref,
F: 'static + Deref,
G: 'static + Deref<Target = NetworkGraph<L>>,
L: 'static + Deref,
P: 'static + Deref,
UL: Deref,
CF: Deref,
T: Deref,
F: Deref,
G: Deref<Target = NetworkGraph<L>>,
L: Deref,
P: Deref,
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
ES: 'static + Deref + Send,
M: 'static
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
ES: Deref + Send,
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
+ Send
+ Sync,
CM: 'static + Deref,
OM: 'static + Deref,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
PM: 'static + Deref,
LM: 'static + Deref,
D: 'static + Deref,
O: 'static + Deref,
K: 'static + Deref,
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
S: 'static + Deref<Target = SC> + Send + Sync,
CM: Deref,
OM: Deref,
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
RGS: Deref<Target = RapidGossipSync<G, L>>,
PM: Deref,
LM: Deref,
D: Deref,
O: Deref,
K: Deref,
OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
S: Deref<Target = SC> + Send + Sync,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Sleeper: Fn(Duration) -> SleepFuture,
Expand All @@ -902,20 +897,20 @@ pub async fn process_events_async<
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
T::Target: 'static + BroadcasterInterface,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
ES::Target: 'static + EntropySource,
UL::Target: UtxoLookup,
CF::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
ES::Target: EntropySource,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
LM::Target: ALiquidityManager,
O::Target: 'static + OutputSpender,
D::Target: 'static + ChangeDestinationSource,
K::Target: 'static + KVStore,
O::Target: OutputSpender,
D::Target: ChangeDestinationSource,
K::Target: KVStore,
{
let async_event_handler = |event| {
let network_graph = gossip_sync.network_graph();
Expand Down Expand Up @@ -1340,31 +1335,30 @@ fn check_and_reset_sleeper<
/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
/// synchronous background persistence.
pub async fn process_events_async_with_kv_store_sync<
UL: 'static + Deref,
CF: 'static + Deref,
T: 'static + Deref,
F: 'static + Deref,
G: 'static + Deref<Target = NetworkGraph<L>>,
L: 'static + Deref + Send + Sync,
P: 'static + Deref,
UL: Deref,
CF: Deref,
T: Deref,
F: Deref,
G: Deref<Target = NetworkGraph<L>>,
L: Deref + Send + Sync,
P: Deref,
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
ES: 'static + Deref + Send,
M: 'static
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
ES: Deref + Send,
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
+ Send
+ Sync,
CM: 'static + Deref + Send + Sync,
OM: 'static + Deref,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>>,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>>,
PM: 'static + Deref,
LM: 'static + Deref,
D: 'static + Deref,
O: 'static + Deref,
K: 'static + Deref,
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, KVStoreSyncWrapper<K>, L, O>>,
S: 'static + Deref<Target = SC> + Send + Sync,
CM: Deref + Send + Sync,
OM: Deref,
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
RGS: Deref<Target = RapidGossipSync<G, L>>,
PM: Deref,
LM: Deref,
D: Deref,
O: Deref,
K: Deref,
OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclear to me now why I didn't do this in the first place.

S: Deref<Target = SC> + Send + Sync,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Sleeper: Fn(Duration) -> SleepFuture,
Expand All @@ -1376,20 +1370,20 @@ pub async fn process_events_async_with_kv_store_sync<
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
T::Target: 'static + BroadcasterInterface,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
ES::Target: 'static + EntropySource,
UL::Target: UtxoLookup,
CF::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
ES::Target: EntropySource,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
LM::Target: ALiquidityManager,
O::Target: 'static + OutputSpender,
D::Target: 'static + ChangeDestinationSource,
K::Target: 'static + KVStoreSync,
O::Target: OutputSpender,
D::Target: ChangeDestinationSourceSync,
K::Target: KVStoreSync,
{
let kv_store = KVStoreSyncWrapper(kv_store);
process_events_async(
Expand All @@ -1401,7 +1395,7 @@ where
gossip_sync,
peer_manager,
liquidity_manager,
sweeper,
sweeper.as_ref().map(|os| os.sweeper_async()),
logger,
scorer,
sleeper,
Expand Down Expand Up @@ -1846,11 +1840,13 @@ mod tests {
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::Writeable;
use lightning::util::sweep::{OutputSpendStatus, OutputSweeperSync, PRUNE_DELAY_BLOCKS};
use lightning::util::sweep::{
OutputSpendStatus, OutputSweeper, OutputSweeperSync, PRUNE_DELAY_BLOCKS,
};
use lightning::util::test_utils;
use lightning::{get_event, get_event_msg};
use lightning_liquidity::utils::time::DefaultTimeProvider;
use lightning_liquidity::{ALiquidityManagerSync, LiquidityManagerSync};
use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync};
use lightning_persister::fs_store::FilesystemStore;
use lightning_rapid_gossip_sync::RapidGossipSync;
use std::collections::VecDeque;
Expand Down Expand Up @@ -1953,7 +1949,7 @@ mod tests {
Arc<ChannelManager>,
Arc<dyn Filter + Sync + Send>,
Arc<Persister>,
Arc<DefaultTimeProvider>,
DefaultTimeProvider,
>;

struct Node {
Expand Down Expand Up @@ -2779,7 +2775,18 @@ mod tests {
let kv_store_sync = Arc::new(
Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
);
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
let kv_store = KVStoreSyncWrapper(kv_store_sync);

// Yes, you can unsafe { turn off the borrow checker }
let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This commit has a nice end result, but this - I am not sure how many devs understand it...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope the comment is clear enough? Easy to just update the comment later if it makes it more readable.

&*(nodes[0].liquidity_manager.get_lm_async()
as *const LiquidityManager<_, _, _, _, _, _>)
as &'static LiquidityManager<_, _, _, _, _, _>
};
let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
as &'static OutputSweeper<_, _, _, _, _, _, _>
};

let bp_future = super::process_events_async(
kv_store,
Expand All @@ -2789,8 +2796,8 @@ mod tests {
Some(Arc::clone(&nodes[0].messenger)),
nodes[0].rapid_gossip_sync(),
Arc::clone(&nodes[0].peer_manager),
Some(nodes[0].liquidity_manager.get_lm_async()),
Some(nodes[0].sweeper.sweeper_async()),
Some(lm_async),
Some(sweeper_async),
Arc::clone(&nodes[0].logger),
Some(Arc::clone(&nodes[0].scorer)),
move |dur: Duration| {
Expand Down Expand Up @@ -3287,7 +3294,18 @@ mod tests {
let data_dir = nodes[0].kv_store.get_data_dir();
let kv_store_sync =
Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
let kv_store = KVStoreSyncWrapper(kv_store_sync);

// Yes, you can unsafe { turn off the borrow checker }
let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe {
&*(nodes[0].liquidity_manager.get_lm_async()
as *const LiquidityManager<_, _, _, _, _, _>)
as &'static LiquidityManager<_, _, _, _, _, _>
};
let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
as &'static OutputSweeper<_, _, _, _, _, _, _>
};

let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
let bp_future = super::process_events_async(
Expand All @@ -3298,8 +3316,8 @@ mod tests {
Some(Arc::clone(&nodes[0].messenger)),
nodes[0].rapid_gossip_sync(),
Arc::clone(&nodes[0].peer_manager),
Some(nodes[0].liquidity_manager.get_lm_async()),
Some(nodes[0].sweeper.sweeper_async()),
Some(lm_async),
Some(sweeper_async),
Arc::clone(&nodes[0].logger),
Some(Arc::clone(&nodes[0].scorer)),
move |dur: Duration| {
Expand Down Expand Up @@ -3501,10 +3519,21 @@ mod tests {
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
let data_dir = nodes[0].kv_store.get_data_dir();
let kv_store_sync = Arc::new(Persister::new(data_dir));
let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync));
let kv_store = KVStoreSyncWrapper(kv_store_sync);

let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());

// Yes, you can unsafe { turn off the borrow checker }
let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe {
&*(nodes[0].liquidity_manager.get_lm_async()
as *const LiquidityManager<_, _, _, _, _, _>)
as &'static LiquidityManager<_, _, _, _, _, _>
};
let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
as &'static OutputSweeper<_, _, _, _, _, _, _>
};

let bp_future = super::process_events_async(
kv_store,
event_handler,
Expand All @@ -3513,8 +3542,8 @@ mod tests {
Some(Arc::clone(&nodes[0].messenger)),
nodes[0].no_gossip_sync(),
Arc::clone(&nodes[0].peer_manager),
Some(nodes[0].liquidity_manager.get_lm_async()),
Some(nodes[0].sweeper.sweeper_async()),
Some(lm_async),
Some(sweeper_async),
Arc::clone(&nodes[0].logger),
Some(Arc::clone(&nodes[0].scorer)),
move |dur: Duration| {
Expand Down
Loading
Loading