Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update
  • Loading branch information
aeyakovenko authored and sakridge committed Apr 27, 2018
commit 444adcd1ca04fcf42a9e916a9263fa0b25207fd3
47 changes: 30 additions & 17 deletions src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::io::Write;
use std::net::{SocketAddr, UdpSocket};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
Expand Down Expand Up @@ -280,7 +280,7 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = AccountantSkel::process(
let e = Self::process(
&skel,
&verified_receiver,
&blob_sender,
Expand All @@ -294,9 +294,22 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
}

/// Create a UDP microservice that forwards messages the given AccountantSkel.
/// This service receives messages from a leader in the network
/// Set `exit` to shutdown its threads.
/// This service receives messages from a leader in the network and processes the transactions
/// on the accountant state.
/// # Arguments
/// * `obj` - The accoutnant state.
/// * `rsubs` - The subscribers.
/// * `exit` - The exit signal.
/// # Remarks
/// The pipeline is constructed as follows
/// 1. receive blobs from the network, these are out of order
/// 2. verify blobs, PoH, signatures
/// 3. reconstruct consequitive window
/// a. order the blobs
/// b. use erasure coding to reconstruct missing blobs
/// c. ask the network for missing blobs
/// 4. process the transaction state machine
/// 5. respond with the hash of the state back to the leader
pub fn replicate(
obj: &Arc<Mutex<AccountantSkel<W>>>,
rsubs: Subscribers,
Expand All @@ -323,26 +336,26 @@ impl<W: Write + Send + 'static> AccountantSkel<W> {
blob_recycler.clone(),
retransmit_receiver,
);


let t_window =
streamer::window(exit.clone(), blob_recycler.clone(), blob_receiver, window_sender, retransmit_sender);
//TODO
//the packets comming out of blob_receiver need to be sent to the GPU and verified
//then sent to the window, which does the erasure coding reconstruction
let t_window = streamer::window(
exit.clone(),
blob_recycler.clone(),
blob_receiver,
window_sender,
retransmit_sender,
);

let skel = obj.clone();
let t_server = spawn(move || loop {
let e = AccountantSkel::replicate(
&skel,
&verified_receiver,
&blob_sender,
&blob_recycler,
);
let e = Self::replicate_state(&skel, &window_receiver, &blob_sender, &blob_recycler);
if e.is_err() && exit.load(Ordering::Relaxed) {
break;
}
});
Ok(vec![t_receiver, t_responder, t_server, t_verifier])
Ok(vec![t_blob_receiver, t_retransmit, t_window, t_server])
}

}

#[cfg(test)]
Expand Down