Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
accountant -> bank
  • Loading branch information
garious committed May 14, 2018
commit d2dd005a596507ae2653734d51a7417c82b80804
188 changes: 85 additions & 103 deletions src/accountant.rs → src/bank.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/bin/genesis-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ extern crate untrusted;

use isatty::stdin_isatty;
use rayon::prelude::*;
use solana::accountant::MAX_ENTRY_IDS;
use solana::bank::MAX_ENTRY_IDS;
use solana::entry::{create_entry, next_entry};
use solana::event::Event;
use solana::mint::MintDemo;
Expand Down
16 changes: 8 additions & 8 deletions src/bin/testnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ extern crate solana;

use getopts::Options;
use isatty::stdin_isatty;
use solana::accountant::Accountant;
use solana::bank::Bank;
use solana::crdt::ReplicatedData;
use solana::entry::Entry;
use solana::event::Event;
Expand Down Expand Up @@ -92,31 +92,31 @@ fn main() {
None
};

eprintln!("creating accountant...");
eprintln!("creating bank...");

let accountant = Accountant::new_from_deposit(&deposit.unwrap());
accountant.register_entry_id(&entry0.id);
accountant.register_entry_id(&entry1.id);
let bank = Bank::new_from_deposit(&deposit.unwrap());
bank.register_entry_id(&entry0.id);
bank.register_entry_id(&entry1.id);

eprintln!("processing entries...");

let mut last_id = entry1.id;
for entry in entries {
last_id = entry.id;
let results = accountant.process_verified_events(entry.events);
let results = bank.process_verified_events(entry.events);
for result in results {
if let Err(e) = result {
eprintln!("failed to process event {:?}", e);
exit(1);
}
}
accountant.register_entry_id(&last_id);
bank.register_entry_id(&last_id);
}

eprintln!("creating networking stack...");

let exit = Arc::new(AtomicBool::new(false));
let rpu = Rpu::new(accountant, last_id, Some(Duration::from_millis(1000)));
let rpu = Rpu::new(bank, last_id, Some(Duration::from_millis(1000)));
let serve_sock = UdpSocket::bind(&serve_addr).unwrap();
let gossip_sock = UdpSocket::bind(&gossip_addr).unwrap();
let replicate_sock = UdpSocket::bind(&replicate_addr).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
//! * layer 1 - As many nodes as we can fit
//! * layer 2 - Everyone else, if layer 1 is `2^10`, layer 2 should be able to fit `2^20` number of nodes.
//!
//! Accountant needs to provide an interface for us to query the stake weight
//! Bank needs to provide an interface for us to query the stake weight

use bincode::{deserialize, serialize};
use byteorder::{LittleEndian, ReadBytesExt};
Expand Down
12 changes: 6 additions & 6 deletions src/entry_writer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The `entry_writer` module helps implement the TPU's write stage.

use accountant::Accountant;
use bank::Bank;
use entry::Entry;
use ledger;
use packet;
Expand All @@ -15,18 +15,18 @@ use std::time::Duration;
use streamer;

pub struct EntryWriter<'a> {
accountant: &'a Accountant,
bank: &'a Bank,
}

impl<'a> EntryWriter<'a> {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accountant: &'a Accountant) -> Self {
EntryWriter { accountant }
/// Create a new Tpu that wraps the given Bank.
pub fn new(bank: &'a Bank) -> Self {
EntryWriter { bank }
}

fn write_entry<W: Write>(&self, writer: &Mutex<W>, entry: &Entry) {
trace!("write_entry entry");
self.accountant.register_entry_id(&entry.id);
self.bank.register_entry_id(&entry.id);
writeln!(
writer.lock().expect("'writer' lock in fn fn write_entry"),
"{}",
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![cfg_attr(feature = "unstable", feature(test))]
pub mod accountant;
pub mod bank;
pub mod crdt;
pub mod ecdsa;
pub mod entry;
Expand Down
18 changes: 9 additions & 9 deletions src/request_processor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The `request_stage` processes thin client Request messages.

use accountant::Accountant;
use bank::Bank;
use bincode::{deserialize, serialize};
use event::Event;
use packet;
Expand All @@ -19,13 +19,13 @@ use streamer;
use timing;

pub struct RequestProcessor {
accountant: Arc<Accountant>,
bank: Arc<Bank>,
}

impl RequestProcessor {
/// Create a new Tpu that wraps the given Accountant.
pub fn new(accountant: Arc<Accountant>) -> Self {
RequestProcessor { accountant }
/// Create a new Tpu that wraps the given Bank.
pub fn new(bank: Arc<Bank>) -> Self {
RequestProcessor { bank }
}

/// Process Request items sent by clients.
Expand All @@ -36,19 +36,19 @@ impl RequestProcessor {
) -> Option<(Response, SocketAddr)> {
match msg {
Request::GetBalance { key } => {
let val = self.accountant.get_balance(&key);
let val = self.bank.get_balance(&key);
let rsp = (Response::Balance { key, val }, rsp_addr);
info!("Response::Balance {:?}", rsp);
Some(rsp)
}
Request::GetLastId => {
let id = self.accountant.last_id();
let id = self.bank.last_id();
let rsp = (Response::LastId { id }, rsp_addr);
info!("Response::LastId {:?}", rsp);
Some(rsp)
}
Request::GetTransactionCount => {
let transaction_count = self.accountant.transaction_count() as u64;
let transaction_count = self.bank.transaction_count() as u64;
let rsp = (Response::TransactionCount { transaction_count }, rsp_addr);
info!("Response::TransactionCount {:?}", rsp);
Some(rsp)
Expand Down Expand Up @@ -174,7 +174,7 @@ impl RequestProcessor {
debug!("events: {} reqs: {}", events.len(), reqs.len());

debug!("process_events");
let results = self.accountant.process_verified_events(events);
let results = self.bank.process_verified_events(events);
let events = results.into_iter().filter_map(|x| x.ok()).collect();
signal_sender.send(Signal::Events(events))?;
debug!("done process_events");
Expand Down
34 changes: 17 additions & 17 deletions src/request_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ impl RequestStage {
}
}

// TODO: When accounting is pulled out of RequestStage, add this test back in.
// TODO: When banking is pulled out of RequestStage, add this test back in.

//use accountant::Accountant;
//use bank::Bank;
//use entry::Entry;
//use event::Event;
//use hash::Hash;
Expand All @@ -67,23 +67,23 @@ impl RequestStage {
//
//#[cfg(test)]
//mod tests {
// use accountant::Accountant;
// use bank::Bank;
// use event::Event;
// use event_processor::EventProcessor;
// use mint::Mint;
// use signature::{KeyPair, KeyPairUtil};
// use transaction::Transaction;
//
// #[test]
// // TODO: Move this test accounting_stage. Calling process_events() directly
// // TODO: Move this test banking_stage. Calling process_events() directly
// // defeats the purpose of this test.
// fn test_accounting_sequential_consistency() {
// fn test_banking_sequential_consistency() {
// // In this attack we'll demonstrate that a verifier can interpret the ledger
// // differently if either the server doesn't signal the ledger to add an
// // Entry OR if the verifier tries to parallelize across multiple Entries.
// let mint = Mint::new(2);
// let accountant = Accountant::new(&mint);
// let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);
// let bank = Bank::new(&mint);
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
//
// // Process a batch that includes a transaction that receives two tokens.
// let alice = KeyPair::new();
Expand All @@ -96,30 +96,30 @@ impl RequestStage {
// let events = vec![Event::Transaction(tr)];
// let entry1 = event_processor.process_events(events).unwrap();
//
// // Collect the ledger and feed it to a new accountant.
// // Collect the ledger and feed it to a new bank.
// let entries = vec![entry0, entry1];
//
// // Assert the user holds one token, not two. If the server only output one
// // entry, then the second transaction will be rejected, because it drives
// // the account balance below zero before the credit is added.
// let accountant = Accountant::new(&mint);
// let bank = Bank::new(&mint);
// for entry in entries {
// assert!(
// accountant
// bank
// .process_verified_events(entry.events)
// .into_iter()
// .all(|x| x.is_ok())
// );
// }
// assert_eq!(accountant.get_balance(&alice.pubkey()), Some(1));
// assert_eq!(bank.get_balance(&alice.pubkey()), Some(1));
// }
//}
//
//#[cfg(all(feature = "unstable", test))]
//mod bench {
// extern crate test;
// use self::test::Bencher;
// use accountant::{Accountant, MAX_ENTRY_IDS};
// use bank::{Bank, MAX_ENTRY_IDS};
// use bincode::serialize;
// use event_processor::*;
// use hash::hash;
Expand All @@ -133,7 +133,7 @@ impl RequestStage {
// #[bench]
// fn process_events_bench(_bencher: &mut Bencher) {
// let mint = Mint::new(100_000_000);
// let accountant = Accountant::new(&mint);
// let bank = Bank::new(&mint);
// // Create transactions between unrelated parties.
// let txs = 100_000;
// let last_ids: Mutex<HashSet<Hash>> = Mutex::new(HashSet::new());
Expand All @@ -147,18 +147,18 @@ impl RequestStage {
// let mut last_ids = last_ids.lock().unwrap();
// if !last_ids.contains(&last_id) {
// last_ids.insert(last_id);
// accountant.register_entry_id(&last_id);
// bank.register_entry_id(&last_id);
// }
// }
//
// // Seed the 'from' account.
// let rando0 = KeyPair::new();
// let tr = Transaction::new(&mint.keypair(), rando0.pubkey(), 1_000, last_id);
// accountant.process_verified_transaction(&tr).unwrap();
// bank.process_verified_transaction(&tr).unwrap();
//
// let rando1 = KeyPair::new();
// let tr = Transaction::new(&rando0, rando1.pubkey(), 2, last_id);
// accountant.process_verified_transaction(&tr).unwrap();
// bank.process_verified_transaction(&tr).unwrap();
//
// // Finally, return a transaction that's unique
// Transaction::new(&rando0, rando1.pubkey(), 1, last_id)
Expand All @@ -170,7 +170,7 @@ impl RequestStage {
// .map(|tr| Event::Transaction(tr))
// .collect();
//
// let event_processor = EventProcessor::new(accountant, &mint.last_id(), None);
// let event_processor = EventProcessor::new(bank, &mint.last_id(), None);
//
// let now = Instant::now();
// assert!(event_processor.process_events(events).is_ok());
Expand Down
10 changes: 5 additions & 5 deletions src/result.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The `result` module exposes a Result type that propagates one of many different Error types.

use accountant;
use bank;
use bincode;
use serde_json;
use std;
Expand All @@ -15,7 +15,7 @@ pub enum Error {
RecvError(std::sync::mpsc::RecvError),
RecvTimeoutError(std::sync::mpsc::RecvTimeoutError),
Serialize(std::boxed::Box<bincode::ErrorKind>),
AccountingError(accountant::AccountingError),
BankError(bank::BankError),
SendError,
Services,
GeneralError,
Expand All @@ -33,9 +33,9 @@ impl std::convert::From<std::sync::mpsc::RecvTimeoutError> for Error {
Error::RecvTimeoutError(e)
}
}
impl std::convert::From<accountant::AccountingError> for Error {
fn from(e: accountant::AccountingError) -> Error {
Error::AccountingError(e)
impl std::convert::From<bank::BankError> for Error {
fn from(e: bank::BankError) -> Error {
Error::BankError(e)
}
}
impl<T> std::convert::From<std::sync::mpsc::SendError<T>> for Error {
Expand Down
18 changes: 9 additions & 9 deletions src/rpu.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The `rpu` module implements the Request Processing Unit, a
//! 5-stage transaction processing pipeline in software.

use accountant::Accountant;
use bank::Bank;
use crdt::{Crdt, ReplicatedData};
use entry::Entry;
use entry_writer::EntryWriter;
Expand All @@ -22,31 +22,31 @@ use std::time::Duration;
use streamer;

pub struct Rpu {
accountant: Arc<Accountant>,
bank: Arc<Bank>,
start_hash: Hash,
tick_duration: Option<Duration>,
}

impl Rpu {
/// Create a new Rpu that wraps the given Accountant.
pub fn new(accountant: Accountant, start_hash: Hash, tick_duration: Option<Duration>) -> Self {
/// Create a new Rpu that wraps the given Bank.
pub fn new(bank: Bank, start_hash: Hash, tick_duration: Option<Duration>) -> Self {
Rpu {
accountant: Arc::new(accountant),
bank: Arc::new(bank),
start_hash,
tick_duration,
}
}

fn write_service<W: Write + Send + 'static>(
accountant: Arc<Accountant>,
bank: Arc<Bank>,
exit: Arc<AtomicBool>,
broadcast: streamer::BlobSender,
blob_recycler: packet::BlobRecycler,
writer: Mutex<W>,
entry_receiver: Receiver<Entry>,
) -> JoinHandle<()> {
spawn(move || loop {
let entry_writer = EntryWriter::new(&accountant);
let entry_writer = EntryWriter::new(&bank);
let _ = entry_writer.write_and_send_entries(
&broadcast,
&blob_recycler,
Expand Down Expand Up @@ -91,7 +91,7 @@ impl Rpu {
let sig_verify_stage = SigVerifyStage::new(exit.clone(), packet_receiver);

let blob_recycler = packet::BlobRecycler::default();
let request_processor = RequestProcessor::new(self.accountant.clone());
let request_processor = RequestProcessor::new(self.bank.clone());
let request_stage = RequestStage::new(
request_processor,
exit.clone(),
Expand All @@ -108,7 +108,7 @@ impl Rpu {

let (broadcast_sender, broadcast_receiver) = channel();
let t_write = Self::write_service(
self.accountant.clone(),
self.bank.clone(),
exit.clone(),
broadcast_sender,
blob_recycler.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ fn retransmit(
/// # Arguments
/// * `sock` - Socket to read from. Read timeout is set to 1.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - This structure needs to be updated and populated by the accountant and via gossip.
/// * `crdt` - This structure needs to be updated and populated by the bank and via gossip.
/// * `recycler` - Blob recycler.
/// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn retransmitter(
Expand Down
Loading