Skip to content

Commit b5be55b

Browse files
authored
fix(coprocessor): host-listener, trust tx & logs order from get_logs (#1946)
* fix(coprocessor): host-listener, trust tx & logs order from get_logs * fix(coprocessor): host-listener, ensure logs are in order
1 parent 0673343 commit b5be55b

5 files changed

Lines changed: 80 additions & 215 deletions

File tree

coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs

Lines changed: 74 additions & 215 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::hash_map::Entry;
22
use std::collections::{HashMap, HashSet};
33

4-
use tracing::{debug, error, info};
4+
use tracing::{debug, error, info, warn};
55
use union_find::{QuickUnionUf, UnionBySize, UnionFind};
66

77
use crate::database::tfhe_event_propagate::{
@@ -40,6 +40,15 @@ impl Transaction {
4040
}
4141
}
4242

43+
fn ensure_logs_order(logs: &mut [LogTfhe]) {
44+
if logs.iter().any(|log| log.log_index.is_none()) {
45+
warn!("Log without index, cannot ensure order, assuming it's ordered");
46+
return;
47+
}
48+
// Note: there is a fast path for already sorted logs
49+
logs.sort_by_key(|log| log.log_index.unwrap_or(0));
50+
}
51+
4352
const AVG_LOGS_PER_TX: usize = 8;
4453
fn scan_transactions(
4554
logs: &[LogTfhe],
@@ -76,73 +85,38 @@ fn scan_transactions(
7685
(ordered_txs_hash, txs)
7786
}
7887

79-
fn tx_of_handle(
80-
ordered_txs: &HashMap<TransactionHash, Transaction>,
81-
) -> (
82-
HashMap<Handle, TransactionHash>,
83-
HashMap<Handle, HashSet<TransactionHash>>,
84-
) {
85-
// handle to tx maps
86-
let mut handle_creator = HashMap::new(); // no intermediate value
87-
let mut handle_consumer = HashMap::new();
88-
for tx in ordered_txs.values() {
89-
for handle in &tx.allowed_handle {
90-
handle_creator.insert(*handle, tx.tx_hash);
91-
}
92-
}
93-
for tx in ordered_txs.values() {
94-
for handle in &tx.input_handle {
95-
if tx.output_handle.contains(handle) {
96-
// self dependency, ignore
97-
continue;
98-
}
99-
if !handle_creator.contains_key(handle) {
100-
// non allowed handle, could be from past chain
101-
continue;
102-
}
103-
match handle_consumer.entry(*handle) {
104-
Entry::Vacant(e) => {
105-
let mut set = HashSet::new();
106-
set.insert(tx.tx_hash);
107-
e.insert(set);
108-
}
109-
Entry::Occupied(mut e) => {
110-
e.get_mut().insert(tx.tx_hash);
111-
}
112-
}
113-
}
114-
}
115-
(handle_creator, handle_consumer)
116-
}
117-
11888
async fn fill_tx_dependence_maps(
89+
ordered_txs_hash: &[TransactionHash],
11990
txs: &mut HashMap<TransactionHash, Transaction>,
12091
used_txs_chains: &mut HashMap<TransactionHash, HashSet<TransactionHash>>,
12192
past_chains: &ChainCache,
12293
) {
123-
// handle to tx maps
124-
let (handle_creator, handle_consumer) = tx_of_handle(txs);
125-
// txs relations
126-
for tx in txs.values_mut() {
94+
let mut allowed_handle_tx: HashMap<Handle, TransactionHash> =
95+
HashMap::new();
96+
for tx_hash in ordered_txs_hash {
97+
let Some(tx) = txs.get_mut(tx_hash) else {
98+
error!("Tx hash {:?} not found in txs map", tx_hash);
99+
continue;
100+
};
127101
// this tx depends on dep_tx
102+
let mut producer_tx = Vec::with_capacity(tx.input_handle.len());
128103
for input_handle in &tx.input_handle {
129-
if tx.output_handle.contains(input_handle) {
130-
// self dependency, ignore
131-
continue;
132-
}
133-
if let Some(dep_tx) = handle_creator.get(input_handle) {
104+
if let Some(dep_tx) = allowed_handle_tx.get(input_handle) {
134105
// intra block
106+
// mark as consumer
135107
tx.input_tx.insert(*dep_tx);
136108
used_txs_chains
137109
.entry(*dep_tx)
138110
.and_modify(|v| {
139-
v.insert(tx.tx_hash);
111+
v.insert(*tx_hash);
140112
})
141113
.or_insert({
142114
let mut h = HashSet::new();
143-
h.insert(tx.tx_hash);
115+
h.insert(*tx_hash);
144116
h
145117
});
118+
// memorize as producer
119+
producer_tx.push(*dep_tx);
146120
} else if let Some(dep_tx_hash) =
147121
past_chains.write().await.get(input_handle)
148122
{
@@ -160,129 +134,22 @@ async fn fill_tx_dependence_maps(
160134
});
161135
}
162136
}
163-
// this tx is used by consumer_tx
164-
for output_handle in &tx.output_handle {
165-
let Some(consumer_txs) = handle_consumer.get(output_handle) else {
166-
continue;
167-
};
168-
for dep_tx in consumer_txs {
169-
if *dep_tx == tx.tx_hash {
170-
// self dependency, ignore
171-
continue;
172-
}
173-
tx.output_tx.insert(*dep_tx);
174-
}
137+
// update allowed handle for next txs
138+
for allowed_handle in &tx.allowed_handle {
139+
allowed_handle_tx.entry(*allowed_handle).or_insert(*tx_hash);
175140
}
176-
}
177-
}
178-
179-
fn topological_order(
180-
ordered_hash: Vec<TransactionHash>,
181-
mut txs: HashMap<TransactionHash, Transaction>,
182-
) -> Vec<Transaction> {
183-
let mut seen_tx: HashSet<TransactionHash> =
184-
HashSet::with_capacity(txs.len());
185-
let mut is_already_sorted = true;
186-
for &tx_hash in &ordered_hash {
187-
let Some(tx) = txs.get(&tx_hash) else {
188-
error!("Transaction {:?} missing in txs map", tx_hash);
189-
continue;
190-
};
141+
// propagate memorized producers
191142
let mut depth_size = 0;
192-
for input_tx in &tx.input_tx {
193-
match txs.get(input_tx) {
194-
None => {
195-
// previous block tx, already seen
196-
continue;
197-
}
198-
Some(dep_tx) => {
199-
depth_size =
200-
depth_size.max(dep_tx.depth_size + dep_tx.size);
201-
}
202-
}
203-
if !seen_tx.contains(input_tx) {
204-
is_already_sorted = false;
205-
error!("Out of order transaction detected: tx {:?} depends on tx {:?} which is later in the block", tx_hash, input_tx);
206-
break;
207-
}
208-
}
209-
if let Some(tx) = txs.get_mut(&tx_hash) {
210-
tx.depth_size = depth_size;
211-
}
212-
seen_tx.insert(tx_hash);
213-
}
214-
if is_already_sorted {
215-
return ordered_hash
216-
.iter()
217-
.filter_map(|tx_hash| txs.remove(tx_hash))
218-
.collect();
219-
}
220-
let mut done_tx = HashSet::with_capacity(txs.len());
221-
let mut stacked_tx = HashSet::with_capacity(txs.len());
222-
let mut stack = Vec::new();
223-
let mut reordered = Vec::with_capacity(txs.len());
224-
for tx_hash in ordered_hash {
225-
stacked_tx.clear();
226-
stack.push(tx_hash);
227-
stacked_tx.insert(tx_hash);
228-
while let Some(tx_hash) = stack.pop() {
229-
if done_tx.contains(&tx_hash) {
230-
continue;
231-
}
232-
let Some(tx) = txs.get(&tx_hash) else {
233-
// previous block tx, already seen
234-
reordered.push(tx_hash);
235-
done_tx.insert(tx_hash);
236-
continue;
237-
};
238-
let mut unseen = vec![];
239-
let mut depth_size = 0;
240-
for input_tx in &tx.input_tx {
241-
match txs.get(input_tx) {
242-
None => {
243-
// previous block tx, already seen
244-
continue;
245-
}
246-
Some(dep_tx) => {
247-
depth_size =
248-
depth_size.max(dep_tx.depth_size + dep_tx.size);
249-
}
250-
}
251-
if !done_tx.contains(input_tx) {
252-
unseen.push(*input_tx);
253-
}
254-
}
255-
if unseen.is_empty() {
256-
if let Some(tx) = txs.get_mut(&tx_hash) {
257-
tx.depth_size = depth_size;
258-
}
259-
reordered.push(tx_hash);
260-
done_tx.insert(tx_hash);
261-
} else {
262-
let mut cut_cycle = false;
263-
for unseen_tx_hash in unseen.iter() {
264-
error!("Reordering transaction: tx {:?} depends on unseen tx {:?}", tx, txs.get(unseen_tx_hash));
265-
if stacked_tx.contains(unseen_tx_hash) {
266-
error!("Fake cyclic dependency detected for transaction {:?}, cutting", tx_hash);
267-
cut_cycle = true;
268-
}
269-
}
270-
if cut_cycle {
271-
reordered.push(tx_hash);
272-
done_tx.insert(tx_hash);
273-
continue;
274-
}
275-
stack.push(tx_hash);
276-
stack.extend(unseen.clone());
277-
stacked_tx.extend(unseen);
278-
}
143+
for dep_tx in &producer_tx {
144+
txs.entry(*dep_tx).and_modify(|dep_tx| {
145+
dep_tx.output_tx.insert(*tx_hash);
146+
depth_size = depth_size.max(dep_tx.depth_size + dep_tx.size);
147+
});
279148
}
149+
txs.entry(*tx_hash).and_modify(|dep_tx| {
150+
dep_tx.depth_size = depth_size;
151+
});
280152
}
281-
debug!("Reordered txs: {:?}", reordered);
282-
reordered
283-
.iter()
284-
.filter_map(|tx_hash| txs.remove(tx_hash))
285-
.collect()
286153
}
287154

288155
async fn grouping_to_chains_connex(
@@ -540,14 +407,24 @@ pub async fn dependence_chains(
540407
connex: bool,
541408
across_blocks: bool,
542409
) -> OrderedChains {
410+
ensure_logs_order(logs);
543411
let (ordered_hash, mut txs) = scan_transactions(logs);
544412
let mut used_txs_chains: HashMap<
545413
TransactionHash,
546414
HashSet<TransactionHash>,
547415
> = HashMap::with_capacity(txs.len());
548-
fill_tx_dependence_maps(&mut txs, &mut used_txs_chains, past_chains).await;
416+
fill_tx_dependence_maps(
417+
&ordered_hash,
418+
&mut txs,
419+
&mut used_txs_chains,
420+
past_chains,
421+
)
422+
.await;
549423
debug!("Transactions: {:?}", txs.values());
550-
let mut ordered_txs = topological_order(ordered_hash, txs);
424+
let mut ordered_txs: Vec<_> = ordered_hash
425+
.iter()
426+
.filter_map(|tx_hash| txs.remove(tx_hash))
427+
.collect();
551428
let chains = if connex {
552429
grouping_to_chains_connex(&mut ordered_txs).await
553430
} else {
@@ -613,6 +490,8 @@ mod tests {
613490
is_allowed: bool,
614491
tx: TransactionHash,
615492
) {
493+
static COUNTER: std::sync::atomic::AtomicU64 =
494+
std::sync::atomic::AtomicU64::new(0);
616495
logs.push(LogTfhe {
617496
event: tfhe_event(e),
618497
is_allowed,
@@ -621,6 +500,9 @@ mod tests {
621500
transaction_hash: Some(tx),
622501
dependence_chain: TransactionHash::ZERO,
623502
tx_depth_size: 0,
503+
log_index: Some(
504+
COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
505+
),
624506
})
625507
}
626508

@@ -682,25 +564,6 @@ mod tests {
682564
result
683565
}
684566

685-
fn allowed_input_handle(
686-
logs: &mut Vec<LogTfhe>,
687-
tx: TransactionHash,
688-
) -> Handle {
689-
let result = new_handle();
690-
push_event(
691-
E::TrivialEncrypt(C::TrivialEncrypt {
692-
caller: caller(),
693-
pt: ClearConst::from_be_slice(&[0]),
694-
toType: 0,
695-
result,
696-
}),
697-
logs,
698-
true,
699-
tx,
700-
);
701-
result
702-
}
703-
704567
fn input_shared_handle(
705568
logs: &mut Vec<LogTfhe>,
706569
handle: Handle,
@@ -798,30 +661,6 @@ mod tests {
798661
assert_eq!(cache.read().await.len(), 2);
799662
}
800663

801-
#[tokio::test]
802-
#[tracing_test::traced_test]
803-
async fn test_dependence_chains_2_local_chain_bad_tx_order() {
804-
let cache = ChainCache::new(lru::LruCache::new(
805-
std::num::NonZeroUsize::new(100).unwrap(),
806-
));
807-
let mut logs = vec![];
808-
let tx1 = TransactionHash::with_last_byte(0);
809-
let tx2 = TransactionHash::with_last_byte(1);
810-
811-
let va_1 = allowed_input_handle(&mut logs, tx1);
812-
let _vb_1 = op1(va_1, &mut logs, tx1);
813-
let _vb_2 = op1(va_1, &mut logs, tx2);
814-
815-
let line = logs.pop().unwrap();
816-
logs.insert(0, line);
817-
818-
let chains = dependence_chains(&mut logs, &cache, false, true).await;
819-
assert!(logs_contain("Out of order"));
820-
assert_eq!(chains.len(), 1);
821-
assert!(logs.iter().all(|log| log.dependence_chain == tx1));
822-
assert_eq!(cache.read().await.len(), 3);
823-
}
824-
825664
#[tokio::test]
826665
async fn test_dependence_chains_2_local_chain_mixed() {
827666
let cache = ChainCache::new(lru::LruCache::new(
@@ -987,6 +826,26 @@ mod tests {
987826
assert_eq!(chains.len(), 1);
988827
}
989828

829+
#[tokio::test]
830+
async fn test_dependence_chains_dep_with_bad_order() {
831+
let cache = ChainCache::new(lru::LruCache::new(
832+
std::num::NonZeroUsize::new(100).unwrap(),
833+
));
834+
let mut logs = vec![];
835+
let tx1 = TransactionHash::with_last_byte(1);
836+
let tx2 = TransactionHash::with_last_byte(2);
837+
let va_1 = input_handle(&mut logs, tx1);
838+
let vb_1 = op1(va_1, &mut logs, tx1);
839+
let _va_1 = op1(vb_1, &mut logs, tx2);
840+
let last = logs.pop().unwrap();
841+
logs.insert(0, last);
842+
assert!(logs[0].transaction_hash == Some(tx2));
843+
let chains = dependence_chains(&mut logs, &cache, false, true).await;
844+
// answer is the same as with good order
845+
assert!(logs.iter().all(|log| log.dependence_chain == tx1));
846+
assert_eq!(chains.len(), 1);
847+
}
848+
990849
#[tokio::test]
991850
async fn test_dependence_chains_2_local_non_allowed_handle() {
992851
let cache = ChainCache::new(lru::LruCache::new(

coprocessor/fhevm-engine/host-listener/src/database/ingest.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ pub async fn ingest_block_logs(
111111
is_allowed: false,
112112
dependence_chain: Default::default(),
113113
tx_depth_size: 0,
114+
log_index: log.log_index,
114115
};
115116
tfhe_event_log.push(log);
116117
continue;

0 commit comments

Comments
 (0)