Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ futures = "0.3"
env_logger = "0.9"
mimalloc = { version = "0.1", optional = true, default-features = false }
snmalloc-rs = {version = "0.2", optional = true, features= ["cache-friendly"] }
rand = "0.8.4"

[dev-dependencies]
ballista-core = { path = "../ballista/rust/core" }
15 changes: 15 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,20 @@ Query 'fare_amt_by_passenger' iteration 1 took 7599 ms
Query 'fare_amt_by_passenger' iteration 2 took 7969 ms
```

## Running the Ballista Loadtest

```bash
cargo run --bin tpch -- loadtest ballista-load
--query-list 1,3,5,6,7,10,12,13
--requests 200
--concurrency 10
--data-path /****
--format parquet
--host localhost
--port 50050
--sql-path /***
--debug
```

[1]: http://www.tpc.org/tpch/
[2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
239 changes: 208 additions & 31 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

//! Benchmark derived from TPC-H. This is not an official TPC-H benchmark.

use futures::future::join_all;
use rand::prelude::*;
use std::ops::Div;
use std::{
fs,
iter::Iterator,
Expand Down Expand Up @@ -137,6 +140,48 @@ struct DataFusionBenchmarkOpt {
mem_table: bool,
}

#[derive(Debug, StructOpt, Clone)]
struct BallistaLoadtestOpt {
#[structopt(short = "q", long)]
query_list: String,

/// Activate debug mode to see query results
#[structopt(short, long)]
debug: bool,

/// Number of requests
#[structopt(short = "r", long = "requests", default_value = "100")]
requests: usize,

/// Number of connections
#[structopt(short = "c", long = "concurrency", default_value = "5")]
concurrency: usize,

/// Number of partitions to process in parallel
#[structopt(short = "n", long = "partitions", default_value = "2")]
partitions: usize,

/// Path to data files
#[structopt(parse(from_os_str), required = true, short = "p", long = "data-path")]
path: PathBuf,

/// Path to sql files
#[structopt(parse(from_os_str), required = true, long = "sql-path")]
sql_path: PathBuf,

/// File format: `csv` or `parquet`
#[structopt(short = "f", long = "format", default_value = "parquet")]
file_format: String,

/// Ballista executor host
#[structopt(long = "host")]
host: Option<String>,

/// Ballista executor port
#[structopt(long = "port")]
port: Option<u16>,
}

#[derive(Debug, StructOpt)]
struct ConvertOpt {
/// Path to csv files
Expand Down Expand Up @@ -173,11 +218,19 @@ enum BenchmarkSubCommandOpt {
DataFusionBenchmark(DataFusionBenchmarkOpt),
}

#[derive(Debug, StructOpt)]
#[structopt(about = "loadtest command")]
enum LoadtestOpt {
#[structopt(name = "ballista-load")]
BallistaLoadtest(BallistaLoadtestOpt),
}

#[derive(Debug, StructOpt)]
#[structopt(name = "TPC-H", about = "TPC-H Benchmarks.")]
enum TpchOpt {
Benchmark(BenchmarkSubCommandOpt),
Convert(ConvertOpt),
Loadtest(LoadtestOpt),
}

const TABLES: &[&str] = &[
Expand All @@ -187,6 +240,7 @@ const TABLES: &[&str] = &[
#[tokio::main]
async fn main() -> Result<()> {
use BenchmarkSubCommandOpt::*;
use LoadtestOpt::*;

env_logger::init();
match TpchOpt::from_args() {
Expand All @@ -197,6 +251,9 @@ async fn main() -> Result<()> {
benchmark_datafusion(opt).await.map(|_| ())
}
TpchOpt::Convert(opt) => convert_tbl(opt).await,
TpchOpt::Loadtest(BallistaLoadtest(opt)) => {
loadtest_ballista(opt).await.map(|_| ())
}
}
}

Expand Down Expand Up @@ -268,6 +325,151 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
// register tables with Ballista context
let path = opt.path.to_str().unwrap();
let file_format = opt.file_format.as_str();

register_tables(path, file_format, &ctx).await;

let mut millis = vec![];

// run benchmark
let sql = get_query_sql(opt.query)?;
println!("Running benchmark with query {}:\n {}", opt.query, sql);
for i in 0..opt.iterations {
let start = Instant::now();
let df = ctx
.sql(&sql)
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
let batches = df
.collect()
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
if opt.debug {
pretty::print_batches(&batches)?;
}
}

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {} avg time: {:.2} ms", opt.query, avg);

Ok(())
}

async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> {
println!(
"Running loadtest_ballista with the following options: {:?}",
opt
);

let config = BallistaConfig::builder()
.set(
BALLISTA_DEFAULT_SHUFFLE_PARTITIONS,
&format!("{}", opt.partitions),
)
.build()
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

let concurrency = opt.concurrency;
let request_amount = opt.requests;
let mut clients = vec![];

for _num in 0..concurrency {
clients.push(BallistaContext::remote(
opt.host.clone().unwrap().as_str(),
opt.port.unwrap(),
&config,
));
}

// register tables with Ballista context
let path = opt.path.to_str().unwrap();
let file_format = opt.file_format.as_str();
let sql_path = opt.sql_path.to_str().unwrap().to_string();

for ctx in &clients {
register_tables(path, file_format, ctx).await;
}

let request_per_thread = request_amount.div(concurrency);
// run benchmark
let query_list: Vec<usize> = opt
.query_list
.split(',')
.map(|s| s.parse().unwrap())
.collect();
println!("query list: {:?} ", &query_list);

let total = Instant::now();
let mut futures = vec![];

for (client_id, client) in clients.into_iter().enumerate() {
let query_list_clone = query_list.clone();
let sql_path_clone = sql_path.clone();
let handle = tokio::spawn(async move {
for i in 0..request_per_thread {
let query_id = query_list_clone
.get(
(0..query_list_clone.len())
.choose(&mut rand::thread_rng())
.unwrap(),
)
.unwrap();
let sql =
get_query_sql_by_path(query_id.to_owned(), sql_path_clone.clone())
.unwrap();
println!(
"Client {} Round {} Query {} started",
&client_id, &i, query_id
);
let start = Instant::now();
let df = client
.sql(&sql)
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
let batches = df
.collect()
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
println!(
"Client {} Round {} Query {} took {:.1} ms ",
&client_id, &i, query_id, elapsed
);
if opt.debug {
pretty::print_batches(&batches).unwrap();
}
}
});
futures.push(handle);
}
join_all(futures).await;
let elapsed = total.elapsed().as_secs_f64() * 1000.0;
println!("###############################");
println!("load test took {:.1} ms", elapsed);
Ok(())
}

fn get_query_sql_by_path(query: usize, mut sql_path: String) -> Result<String> {
if sql_path.ends_with('/') {
sql_path.pop();
}
if query > 0 && query < 23 {
let filename = format!("{}/q{}.sql", sql_path, query);
Ok(fs::read_to_string(&filename).expect("failed to read query"))
} else {
Err(DataFusionError::Plan(
"invalid query. Expected value between 1 and 22".to_owned(),
))
}
}

async fn register_tables(path: &str, file_format: &str, ctx: &BallistaContext) {
for table in TABLES {
match file_format {
// dbgen creates .tbl ('|' delimited) files without header
Expand All @@ -281,55 +483,30 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> {
.file_extension(".tbl");
ctx.register_csv(table, &path, options)
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
}
"csv" => {
let path = format!("{}/{}", path, table);
let schema = get_schema(table);
let options = CsvReadOptions::new().schema(&schema).has_header(true);
ctx.register_csv(table, &path, options)
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
}
"parquet" => {
let path = format!("{}/{}", path, table);
ctx.register_parquet(table, &path)
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))
.unwrap();
}
other => {
unimplemented!("Invalid file format '{}'", other);
}
}
}

let mut millis = vec![];

// run benchmark
let sql = get_query_sql(opt.query)?;
println!("Running benchmark with query {}:\n {}", opt.query, sql);
for i in 0..opt.iterations {
let start = Instant::now();
let df = ctx
.sql(&sql)
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
let batches = df
.collect()
.await
.map_err(|e| DataFusionError::Plan(format!("{:?}", e)))?;
let elapsed = start.elapsed().as_secs_f64() * 1000.0;
millis.push(elapsed as f64);
println!("Query {} iteration {} took {:.1} ms", opt.query, i, elapsed);
if opt.debug {
pretty::print_batches(&batches)?;
}
}

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {} avg time: {:.2} ms", opt.query, avg);

Ok(())
}

fn get_query_sql(query: usize) -> Result<String> {
Expand Down