Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use super::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use crate::arrow::array::BooleanBufferBuilder;
use crate::arrow::datatypes::TimeUnit;
use crate::physical_plan::coalesce_batches::concat_batches;
use crate::physical_plan::PhysicalExpr;
Expand Down Expand Up @@ -401,9 +402,13 @@ impl ExecutionPlan for HashJoinExec {
let num_rows = left_data.1.num_rows();
let visited_left_side = match self.join_type {
JoinType::Left | JoinType::Full | JoinType::Semi | JoinType::Anti => {
vec![false; num_rows]
let mut buffer = BooleanBufferBuilder::new(num_rows);

buffer.append_n(num_rows, false);

buffer
}
JoinType::Inner | JoinType::Right => vec![],
JoinType::Inner | JoinType::Right => BooleanBufferBuilder::new(0),
};
Ok(Box::pin(HashJoinStream::new(
self.schema.clone(),
Expand Down Expand Up @@ -502,8 +507,7 @@ struct HashJoinStream {
/// Random state used for hashing initialization
random_state: RandomState,
/// Keeps track of the left side rows whether they are visited
visited_left_side: Vec<bool>,
// TODO: use a more memory efficient data structure, https://github.com/apache/arrow-datafusion/issues/240
visited_left_side: BooleanBufferBuilder,
/// There is nothing to process anymore and left side is processed in case of left join
is_exhausted: bool,
/// Metrics
Expand All @@ -525,7 +529,7 @@ impl HashJoinStream {
right: SendableRecordBatchStream,
column_indices: Vec<ColumnIndex>,
random_state: RandomState,
visited_left_side: Vec<bool>,
visited_left_side: BooleanBufferBuilder,
join_metrics: HashJoinMetrics,
null_equals_null: bool,
) -> Self {
Expand Down Expand Up @@ -909,29 +913,21 @@ fn equal_rows(

// Produces a batch for left-side rows that have/have not been matched during the whole join
fn produce_from_matched(
visited_left_side: &[bool],
visited_left_side: &BooleanBufferBuilder,
schema: &SchemaRef,
column_indices: &[ColumnIndex],
left_data: &JoinLeftData,
unmatched: bool,
) -> ArrowResult<RecordBatch> {
// Find indices which didn't match any right row (are false)
let indices = if unmatched {
UInt64Array::from_iter_values(
visited_left_side
.iter()
.enumerate()
.filter(|&(_, &value)| !value)
.map(|(index, _)| index as u64),
(0..visited_left_side.len())
.filter_map(|v| (!visited_left_side.get_bit(v)).then(|| v as u64)),
)
} else {
// produce those that did match
UInt64Array::from_iter_values(
visited_left_side
.iter()
.enumerate()
.filter(|&(_, &value)| value)
.map(|(index, _)| index as u64),
(0..visited_left_side.len())
.filter_map(|v| (visited_left_side.get_bit(v)).then(|| v as u64)),
)
};

Expand Down Expand Up @@ -991,7 +987,7 @@ impl Stream for HashJoinStream {
| JoinType::Semi
| JoinType::Anti => {
left_side.iter().flatten().for_each(|x| {
self.visited_left_side[x as usize] = true;
self.visited_left_side.set_bit(x as usize, true);
});
}
JoinType::Inner | JoinType::Right => {}
Expand Down