Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,8 @@ name = "solana-client-demo"
path = "src/bin/client-demo.rs"

[[bin]]
name = "solana-multinode-demo"
path = "src/bin/multinode-demo.rs"

[[bin]]
name = "solana-testnode"
path = "src/bin/testnode.rs"
name = "solana-fullnode"
path = "src/bin/fullnode.rs"

[[bin]]
name = "solana-genesis"
Expand Down
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ $ git clone https://github.com/solana-labs/solana.git
$ cd solana
```

The testnode server is initialized with a ledger from stdin and
The fullnode server is initialized with a ledger from stdin and
generates new ledger entries on stdout. To create the input ledger, we'll need
to create *the mint* and use it to generate a *genesis ledger*. It's done in
two steps because the mint-demo.json file contains private keys that will be
Expand All @@ -50,7 +50,7 @@ used later in this demo.
Now you can start the server:

```bash
$ cat genesis.log | cargo run --release --bin solana-testnode > transactions0.log
$ cat genesis.log | cargo run --release --bin solana-fullnode > transactions0.log
```

Wait a few seconds for the server to initialize. It will print "Ready." when it's safe
Expand All @@ -76,7 +76,7 @@ Now restart the server from where we left off. Pass it both the genesis ledger,
the transaction ledger.

```bash
$ cat genesis.log transactions0.log | cargo run --release --bin solana-testnode > transactions1.log
$ cat genesis.log transactions0.log | cargo run --release --bin solana-fullnode > transactions1.log
```

Lastly, run the client demo again, and verify that all funds were spent in the
Expand Down Expand Up @@ -128,11 +128,11 @@ Debugging
---

There are some useful debug messages in the code, you can enable them on a per-module and per-level
basis with the normal RUST\_LOG environment variable. Run the testnode with this syntax:
basis with the normal RUST\_LOG environment variable. Run the fullnode with this syntax:
```bash
$ RUST_LOG=solana::streamer=debug,solana::accountant_skel=info cat genesis.log | ./target/release/solana-testnode > transactions0.log
$ RUST_LOG=solana::streamer=debug,solana::server=info cat genesis.log | ./target/release/solana-fullnode > transactions0.log
```
to see the debug and info sections for streamer and accountant\_skel respectively. Generally
to see the debug and info sections for streamer and server respectively. Generally
we are using debug for infrequent debug messages, trace for potentially frequent messages and
info for performance-related logging.

Expand Down
207 changes: 148 additions & 59 deletions src/bin/client-demo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@ use futures::Future;
use getopts::Options;
use isatty::stdin_isatty;
use rayon::prelude::*;
use solana::crdt::{Crdt, ReplicatedData};
use solana::mint::MintDemo;
use solana::signature::{GenKeys, KeyPairUtil};
use solana::signature::{GenKeys, KeyPair, KeyPairUtil};
use solana::streamer::default_window;
use solana::thin_client::ThinClient;
use solana::transaction::Transaction;
use std::env;
use std::fs::File;
use std::io::{stdin, Read};
use std::net::{SocketAddr, UdpSocket};
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use std::thread::sleep;
use std::time::Duration;
use std::time::Instant;
Expand All @@ -32,13 +38,19 @@ fn print_usage(program: &str, opts: Options) {

fn main() {
let mut threads = 4usize;
let mut server_addr: String = "127.0.0.1:8000".to_string();
let mut requests_addr: String = "127.0.0.1:8010".to_string();
let mut num_nodes = 10usize;
let mut leader = "leader.json".to_string();

let mut opts = Options::new();
opts.optopt("s", "", "server address", "host:port");
opts.optopt("l", "", "leader", "leader.json");
opts.optopt("c", "", "client address", "host:port");
opts.optopt("t", "", "number of threads", &format!("{}", threads));
opts.optopt(
"n",
"",
"number of nodes to converge to",
&format!("{}", num_nodes),
);
opts.optflag("h", "help", "print help");
let args: Vec<String> = env::args().collect();
let matches = match opts.parse(&args[1..]) {
Expand All @@ -54,19 +66,32 @@ fn main() {
print_usage(&program, opts);
return;
}
if matches.opt_present("s") {
server_addr = matches.opt_str("s").unwrap();
}
if matches.opt_present("c") {
requests_addr = matches.opt_str("c").unwrap();
if matches.opt_present("l") {
leader = matches.opt_str("l").unwrap();
}
let client_addr: Arc<RwLock<SocketAddr>> = if matches.opt_present("c") {
let addr = matches.opt_str("c").unwrap().parse().unwrap();
Arc::new(RwLock::new(addr))
} else {
Arc::new(RwLock::new("127.0.0.1:8010".parse().unwrap()))
};
if matches.opt_present("t") {
threads = matches.opt_str("t").unwrap().parse().expect("integer");
}
if matches.opt_present("n") {
num_nodes = matches.opt_str("n").unwrap().parse().expect("integer");
}

let mut transactions_addr: SocketAddr = requests_addr.parse().unwrap();
let requests_port = transactions_addr.port();
transactions_addr.set_port(requests_port + 1);
let leader: ReplicatedData = read_leader(leader);
let signal = Arc::new(AtomicBool::new(false));
let mut c_threads = vec![];
let validators = converge(
&client_addr,
&leader,
signal.clone(),
num_nodes + 2,
&mut c_threads,
);

if stdin_isatty() {
eprintln!("nothing found on stdin, expected a json file");
Expand All @@ -85,23 +110,7 @@ fn main() {
eprintln!("failed to parse json: {}", e);
exit(1);
});

println!("Binding to {}", requests_addr);
let requests_socket = UdpSocket::bind(&requests_addr).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap();
let requests_addr: SocketAddr = server_addr.parse().unwrap();
let requests_port = requests_addr.port();
let mut transactions_addr = requests_addr.clone();
transactions_addr.set_port(requests_port + 3);
let mut client = ThinClient::new(
requests_addr,
requests_socket,
transactions_addr,
transactions_socket,
);
let mut client = mk_client(&client_addr, &leader);

println!("Get last ID...");
let last_id = client.get_last_id().wait().unwrap();
Expand All @@ -120,7 +129,7 @@ fn main() {
.into_par_iter()
.map(|chunk| Transaction::new(&chunk[0], chunk[1].pubkey(), 1, last_id))
.collect();
let mut duration = now.elapsed();
let duration = now.elapsed();
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let bsps = txs as f64 / ns as f64;
let nsps = ns as f64 / txs as f64;
Expand All @@ -130,46 +139,126 @@ fn main() {
nsps / 1_000_f64
);

let initial_tx_count = client.transaction_count();
println!("initial count {}", initial_tx_count);
let first_count = client.transaction_count();
println!("initial count {}", first_count);

println!("Transfering {} transactions in {} batches", txs, threads);
let now = Instant::now();
let sz = transactions.len() / threads;
let chunks: Vec<_> = transactions.chunks(sz).collect();
chunks.into_par_iter().for_each(|txs| {
println!("Transferring 1 unit {} times... to", txs.len());
let requests_addr: SocketAddr = server_addr.parse().unwrap();
let mut requests_cb_addr = requests_addr.clone();
requests_cb_addr.set_port(0);
let requests_socket = UdpSocket::bind(requests_cb_addr).unwrap();
requests_socket
.set_read_timeout(Some(Duration::new(5, 0)))
.unwrap();
let mut transactions_addr: SocketAddr = requests_addr.clone();
transactions_addr.set_port(0);
let transactions_socket = UdpSocket::bind(&transactions_addr).unwrap();
let client = ThinClient::new(
requests_addr,
requests_socket,
transactions_addr,
transactions_socket,
);
let client = mk_client(&client_addr, &leader);
for tx in txs {
client.transfer_signed(tx.clone()).unwrap();
}
});

println!("Waiting for transactions to complete...",);
let mut tx_count;
for _ in 0..10 {
tx_count = client.transaction_count();
duration = now.elapsed();
let txs = tx_count - initial_tx_count;
println!("Transactions processed {}", txs);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (txs * 1_000_000_000) as f64 / ns as f64;
println!("{} tps", tps);
println!("Sampling tps every second...",);
validators.into_par_iter().for_each(|val| {
let mut client = mk_client(&client_addr, &val);
let mut now = Instant::now();
let mut initial_tx_count = client.transaction_count();
for i in 0..100 {
let tx_count = client.transaction_count();
let duration = now.elapsed();
now = Instant::now();
let sample = tx_count - initial_tx_count;
initial_tx_count = tx_count;
println!(
"{}: Transactions processed {}",
val.transactions_addr, sample
);
let ns = duration.as_secs() * 1_000_000_000 + u64::from(duration.subsec_nanos());
let tps = (sample * 1_000_000_000) as f64 / ns as f64;
println!("{}: {} tps", val.transactions_addr, tps);
let total = tx_count - first_count;
println!(
"{}: Total Transactions processed {}",
val.transactions_addr, total
);
if total == transactions.len() as u64 {
break;
}
if i > 20 && sample == 0 {
break;
}
sleep(Duration::new(1, 0));
}
});
signal.store(true, Ordering::Relaxed);
for t in c_threads {
t.join().unwrap();
}
}

fn mk_client(locked_addr: &Arc<RwLock<SocketAddr>>, r: &ReplicatedData) -> ThinClient {
let mut addr = locked_addr.write().unwrap();
let port = addr.port();
let transactions_socket = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 1);
let requests_socket = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 2);
ThinClient::new(
r.requests_addr,
requests_socket,
r.transactions_addr,
transactions_socket,
)
}

fn spy_node(client_addr: &Arc<RwLock<SocketAddr>>) -> (ReplicatedData, UdpSocket) {
let mut addr = client_addr.write().unwrap();
let port = addr.port();
let gossip = UdpSocket::bind(addr.clone()).unwrap();
addr.set_port(port + 1);
let daddr = "0.0.0.0:0".parse().unwrap();
let pubkey = KeyPair::new().pubkey();
let node = ReplicatedData::new(pubkey, gossip.local_addr().unwrap(), daddr, daddr, daddr);
(node, gossip)
}

fn converge(
client_addr: &Arc<RwLock<SocketAddr>>,
leader: &ReplicatedData,
exit: Arc<AtomicBool>,
num_nodes: usize,
threads: &mut Vec<JoinHandle<()>>,
) -> Vec<ReplicatedData> {
//lets spy on the network
let daddr = "0.0.0.0:0".parse().unwrap();
let (spy, spy_gossip) = spy_node(client_addr);
let mut spy_crdt = Crdt::new(spy);
spy_crdt.insert(&leader);
spy_crdt.set_leader(leader.id);

let spy_ref = Arc::new(RwLock::new(spy_crdt));
let spy_window = default_window();
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
//wait for the network to converge
for _ in 0..30 {
let min = spy_ref.read().unwrap().convergence();
if num_nodes as u64 == min {
println!("converged!");
break;
}
sleep(Duration::new(1, 0));
}
threads.push(t_spy_listen);
threads.push(t_spy_gossip);
let v: Vec<ReplicatedData> = spy_ref
.read()
.unwrap()
.table
.values()
.into_iter()
.filter(|x| x.requests_addr != daddr)
.map(|x| x.clone())
.collect();
v.clone()
}

fn read_leader(path: String) -> ReplicatedData {
let file = File::open(path).expect("file");
serde_json::from_reader(file).expect("parse")
}
File renamed without changes.
Loading