Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod thin_client;
pub mod timing;
pub mod transaction;
pub mod tvu;
pub mod write_stage;
extern crate bincode;
extern crate byteorder;
extern crate chrono;
Expand Down
40 changes: 7 additions & 33 deletions src/rpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use entry::Entry;
use entry_writer::EntryWriter;
use hash::Hash;
use packet;
use record_stage::RecordStage;
Expand All @@ -14,12 +12,13 @@ use result::Result;
use sig_verify_stage::SigVerifyStage;
use std::io::Write;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{spawn, JoinHandle};
use std::thread::JoinHandle;
use std::time::Duration;
use streamer;
use write_stage::WriteStage;

pub struct Rpu {
bank: Arc<Bank>,
Expand All @@ -37,29 +36,6 @@ impl Rpu {
}
}

fn write_service<W: Write + Send + 'static>(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler,
writer: Mutex<W>,
entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move || loop {
let entry_writer = EntryWriter::new(&bank);
let _ = entry_writer.write_and_send_entries(
&broadcast,
&blob_recycler,
&writer,
&entry_receiver,
);
if exit.load(Ordering::Relaxed) {
info!("broadcat_service exiting");
break;
}
})
}

/// Create a UDP microservice that forwards messages the given Rpu.
/// This service is the network leader
/// Set `exit` to shutdown its threads.
Expand Down Expand Up @@ -106,11 +82,9 @@ impl Rpu {
self.tick_duration,
);

let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service(
let write_stage = WriteStage::new(
self.bank.clone(),
exit.clone(),
broadcast_sender,
blob_recycler.clone(),
Mutex::new(writer),
record_stage.entry_receiver,
Expand All @@ -122,7 +96,7 @@ impl Rpu {
exit.clone(),
crdt.clone(),
blob_recycler.clone(),
broadcast_receiver,
write_stage.blob_receiver,
);

let respond_socket = UdpSocket::bind(local.clone())?;
Expand All @@ -137,7 +111,7 @@ impl Rpu {
t_receiver,
t_responder,
request_stage.thread_hdl,
t_write,
write_stage.thread_hdl,
t_gossip,
t_listen,
t_broadcast,
Expand Down
28 changes: 5 additions & 23 deletions src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use entry::Entry;
use entry_writer::EntryWriter;
use hash::Hash;
use ledger;
use packet;
Expand All @@ -15,11 +13,12 @@ use result::Result;
use sig_verify_stage::SigVerifyStage;
use std::net::UdpSocket;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::thread::{spawn, JoinHandle};
use std::time::Duration;
use streamer;
use write_stage::WriteStage;

pub struct Tvu {
bank: Arc<Bank>,
Expand All @@ -37,23 +36,6 @@ impl Tvu {
}
}

fn drain_service(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move || {
let entry_writer = EntryWriter::new(&bank);
loop {
let _ = entry_writer.drain_entries(&entry_receiver);
if exit.load(Ordering::Relaxed) {
info!("drain_service exiting");
break;
}
}
})
}

/// Process verified blobs, already in order
/// Respond with a signed hash of the state
fn replicate_state(
Expand Down Expand Up @@ -188,8 +170,8 @@ impl Tvu {
obj.tick_duration,
);

let t_write =
Self::drain_service(obj.bank.clone(), exit.clone(), record_stage.entry_receiver);
let write_stage =
WriteStage::new_drain(obj.bank.clone(), exit.clone(), record_stage.entry_receiver);

let t_responder = streamer::responder(
respond_socket,
Expand All @@ -210,7 +192,7 @@ impl Tvu {
t_packet_receiver,
t_responder,
request_stage.thread_hdl,
t_write,
write_stage.thread_hdl,
];
threads.extend(sig_verify_stage.thread_hdls.into_iter());
Ok(threads)
Expand Down
71 changes: 71 additions & 0 deletions src/write_stage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//! The `write_stage` module implements write stage of the RPU.

use bank::Bank;
use entry::Entry;
use entry_writer::EntryWriter;
use packet;
use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver};
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use streamer;

pub struct WriteStage {
pub thread_hdl: JoinHandle<()>,
pub blob_receiver: streamer::BlobReceiver,
}

impl WriteStage {
/// Create a new Rpu that wraps the given Bank.
pub fn new<W: Write + Send + 'static>(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
blob_recycler: packet::BlobRecycler,
writer: Mutex<W>,
entry_receiver: Receiver<Entry>,
) -> Self {
let (blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || loop {
let entry_writer = EntryWriter::new(&bank);
let _ = entry_writer.write_and_send_entries(
&blob_sender,
&blob_recycler,
&writer,
&entry_receiver,
);
if exit.load(Ordering::Relaxed) {
info!("broadcat_service exiting");
break;
}
});

WriteStage {
thread_hdl,
blob_receiver,
}
}

pub fn new_drain(
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
entry_receiver: Receiver<Entry>,
) -> Self {
let (_blob_sender, blob_receiver) = channel();
let thread_hdl = spawn(move || {
let entry_writer = EntryWriter::new(&bank);
loop {
let _ = entry_writer.drain_entries(&entry_receiver);
if exit.load(Ordering::Relaxed) {
info!("drain_service exiting");
break;
}
}
});

WriteStage {
thread_hdl,
blob_receiver,
}
}
}