Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Improve aggregatation documentation
  • Loading branch information
alamb authored and korowa committed Aug 3, 2024
commit b3c033f5b6e2ab45e02c3aa686866c6b78050e72
105 changes: 103 additions & 2 deletions datafusion/expr/src/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub trait Accumulator: Send + Sync + Debug {
///
/// Intermediate state is used for "multi-phase" grouping in
/// DataFusion, where an aggregate is computed in parallel with
/// multiple `Accumulator` instances, as illustrated below:
/// multiple `Accumulator` instances, as described below:
///
/// # MultiPhase Grouping
///
Expand Down Expand Up @@ -130,7 +130,7 @@ pub trait Accumulator: Send + Sync + Debug {
/// `───────' `───────'
/// ```
///
/// The partial state is serialied as `Arrays` and then combined
/// The partial state is serialized as `Arrays` and then combined
/// with other partial states from different instances of this
/// Accumulator (that ran on different partitions, for example).
///
Expand All @@ -147,6 +147,107 @@ pub trait Accumulator: Send + Sync + Debug {
/// Note that [`ScalarValue::List`] can be used to pass multiple
/// values if the number of intermediate values is not known at
/// planning time (e.g. for `MEDIAN`)
///
/// # Multi-phase repartitioned Grouping
///
/// Many multi-phase grouping plans contain a Repartition operation
/// as well as shown below:
///
/// ```text
/// ▲ ▲
/// │ │
/// │ │
/// │ │
/// │ │
/// │ │
/// ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final
/// │GroupBy │ │GroupBy │ GroupBy has an entry for its
/// │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case
/// │ │ │ │ that means half the entries)
/// └───────────────────────┘ └───────────────────────┘
/// ▲ ▲
/// │ │
/// └─────────────┬────────────┘
/// │
/// │
/// │
/// ┌─────────────────────────┐ 3. Repartitioning by hash(group
/// │ Repartition │ keys) ensures that each distinct
/// │ HASH(x) │ group key now appears in exactly
/// └─────────────────────────┘ one partition
/// ▲
/// │
/// ┌───────────────┴─────────────┐
/// │ │
/// │ │
/// ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial
/// │ GroubyBy │ │ GroubyBy │ GroupBy has an entry for *all*
/// │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups
/// └─────────────────────────┘ └──────────────────────────┘
/// ▲ ▲
/// │ ┌┘
/// │ │
/// .─────────. .─────────.
/// ,─' '─. ,─' '─.
/// ; Input : ; Input : 1. Since input data is
/// : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin
/// ╲ ╱ ╲ ╱ distributed, each partition
/// '─. ,─' '─. ,─' likely has all distinct
/// `───────' `───────'
/// ```
///
/// This structure is used so that the `AggregateMode::Partial` accumulators
/// reduces the cardinality of the input as soon as possible. Typically,
/// each partial accumulator sees all groups in the input as the group keys
/// are evenly distributed across the input.
///
/// The final output is computed by repartitioning the result of
/// [`Self::state`] from each Partial aggregate and `hash(group keys)` so
/// that each distinct group key appears in exactly one of the
/// `AggregateMode::Final` GroupBy nodes. The output of the final nodes are
/// then unioned together to produce the overall final output.
///
/// Here is an example that shows the distribution of groups in the
/// different phases
///
/// ```text
/// ┌─────┐ ┌─────┐
/// │ 1 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │ After repartitioning by
/// └─────┘ └─────┘ hash(group keys), each distinct
/// ┌─────┐ ┌─────┐ group key now appears in exactly
/// │ 1 │ │ 3 │ one partition
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │
/// └─────┘ └─────┘
///
///
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
///
/// ┌─────┐ ┌─────┐
/// │ 2 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 3 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 1 │
/// └─────┘ └─────┘ Input data is arbitrarily or
/// ... ... RoundRobin distributed, each
/// ┌─────┐ ┌─────┐ partition likely has all
/// │ 1 │ │ 4 │ distinct group keys
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 1 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// └─────┘ └─────┘
///
/// group values group values
/// in partition 0 in partition 1
/// ```
fn state(&mut self) -> Result<Vec<ScalarValue>>;

/// Updates the accumulator's state from an `Array` containing one
Expand Down
58 changes: 48 additions & 10 deletions datafusion/expr/src/groups_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,23 @@ pub trait GroupsAccumulator: Send {
/// Returns the intermediate aggregate state for this accumulator,
/// used for multi-phase grouping, resetting its internal state.
///
/// See [`Accumulator::state`] for more information on multi-phase
/// aggregation.
///
/// For example, `AVG` might return two arrays: `SUM` and `COUNT`
/// but the `MIN` aggregate would just return a single array.
///
/// Note more sophisticated internal state can be passed as
/// single `StructArray` rather than multiple arrays.
///
/// See [`Self::evaluate`] for details on the required output
/// order and `emit_to`.
/// order and `emit_to`.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Merges intermediate state (the output from [`Self::state`])
/// into this accumulator's values.
/// into this accumulator's current state.
///
/// For some aggregates (such as `SUM`), `merge_batch` is the same
/// as `update_batch`, but for some aggregates (such as `COUNT`,
Expand All @@ -158,9 +163,41 @@ pub trait GroupsAccumulator: Send {
total_num_groups: usize,
) -> Result<()>;

/// Converts input batch to intermediate aggregate state,
/// without grouping (each input row considered as a separate
/// group).
/// Converts an input batch directly the intermediate aggregate state.
///
/// This is the equivalent of treating each input row as its own group. It
/// is invoked when the Partial phase of a multi-phase aggregation is not
/// reducing the cardinality enough to warrant spending more effort on
/// pre-aggregation (see `Background` section below), and switches to
/// passing intermediate state directly on to the next aggregation phase.
///
/// Examples:
/// * `COUNT`: an array of 1s for each row in the input batch.
/// * `SUM/MIN/MAX`: the input values themselves.
///
/// # Arguments
/// * `values`: the input arguments to the accumulator
/// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored
///
/// # Background
///
/// In a multi-phase aggregation (see [`Accumulator::state`]), the initial
/// Partial phase reduces the cardinality of the input data as soon as
/// possible in the plan.
///
/// This strategy is very effective for queries with a small number of
/// groups, as most of the data is aggregated immediately and only a small
/// amount of data must be repartitioned (see [`Accumulator::state`] for
/// background)
///
/// However, for queries with a large number of groups, the Partial phase
/// often does not reduce the cardinality enough to warrant the memory and
/// CPU cost of actually performing the aggregation. For such cases, the
/// HashAggregate operator will dynamically switch to passing intermediate
/// state directly to the next aggregation phase with minimal processing
/// using this method.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn convert_to_state(
&self,
_values: &[ArrayRef],
Expand All @@ -169,15 +206,16 @@ pub trait GroupsAccumulator: Send {
not_impl_err!("Input batch conversion to state not implemented")
}

/// Returns `true` is groups accumulator supports input batch
/// to intermediate aggregate state conversion (`convert_to_state`
/// method is implemented).
/// Returns `true` if [`Self::convert_to_state`] is implemented to support
/// intermediate aggregate state conversion.
fn supports_convert_to_state(&self) -> bool {
false
}

/// Amount of memory used to store the state of this accumulator,
/// in bytes. This function is called once per batch, so it should
/// be `O(n)` to compute, not `O(num_groups)`
/// in bytes.
///
/// This function is called once per batch, so it should be `O(n)` to
/// compute, not `O(num_groups)`
fn size(&self) -> usize;
}
4 changes: 3 additions & 1 deletion datafusion/expr/src/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {

/// Return the fields used to store the intermediate state of this accumulator.
///
/// See [`Accumulator::state`] for background information.
///
/// args: [`StateFieldsArgs`] contains arguments passed to the
/// aggregate function's accumulator.
///
Expand Down Expand Up @@ -388,7 +390,7 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
/// # Notes
///
/// Even if this function returns true, DataFusion will still use
/// `Self::accumulator` for certain queries, such as when this aggregate is
/// [`Self::accumulator`] for certain queries, such as when this aggregate is
/// used as a window function or when there no GROUP BY columns in the
/// query.
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
Expand Down
5 changes: 5 additions & 0 deletions datafusion/functions-aggregate/src/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ impl GroupsAccumulator for CountGroupsAccumulator {
Ok(vec![Arc::new(counts) as ArrayRef])
}

/// Converts an input batch directly to a state batch
///
/// The state of `COUNT` is always a single Int64Array:
/// * `1` (for non-null, non filtered values)
/// * `0` (for null values)
fn convert_to_state(
&self,
values: &[ArrayRef],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ where
self.update_batch(values, group_indices, opt_filter, total_num_groups)
}

/// Converts an input batch directly to a state batch
///
/// The state is:
/// - self.prim_fn for all non null, non filtered values
/// - null otherwise
///
fn convert_to_state(
&self,
values: &[ArrayRef],
Expand Down
18 changes: 15 additions & 3 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,20 @@ mod topk;
mod topk_stream;

/// Hash aggregate modes
///
/// See [`Accumulator::state`] for background information on multi-phase
/// aggregation and how these modes are used.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AggregateMode {
/// Partial aggregate that can be applied in parallel across input partitions
/// Partial aggregate that can be applied in parallel across input
/// partitions.
///
/// This is the first phase of a multi-phase aggregation.
Partial,
/// Final aggregate that produces a single partition of output
/// Final aggregate that produces a single partition of output by combining
/// the output of multiple partial aggregates.
///
/// This is the second phase of a multi-phase aggregation.
Final,
/// Final aggregate that works on pre-partitioned data.
///
Expand All @@ -72,12 +81,15 @@ pub enum AggregateMode {
/// Applies the entire logical aggregation operation in a single operator,
/// as opposed to Partial / Final modes which apply the logical aggregation using
/// two operators.
///
/// This mode requires that the input is a single partition (like Final)
Single,
/// Applies the entire logical aggregation operation in a single operator,
/// as opposed to Partial / Final modes which apply the logical aggregation using
/// two operators.
/// This mode requires that the input is partitioned by group key (like FinalPartitioned)
///
/// This mode requires that the input is partitioned by group key (like
/// FinalPartitioned)
SinglePartitioned,
}

Expand Down
35 changes: 29 additions & 6 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ pub(crate) enum ExecutionState {
/// When producing output, the remaining rows to output are stored
/// here and are sliced off as needed in batch_size chunks
ProducingOutput(RecordBatch),
/// Indicates that GroupedHashAggregateStream should produce
/// intermediate aggregate state for each input rows without
/// their aggregation
/// Produce intermediate aggregate state for each input row without
/// aggregation.
///
/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`]
SkippingAggregation,
/// All input has been consumed and all groups have been emitted
Done,
}

Expand Down Expand Up @@ -94,6 +96,9 @@ struct SpillState {
merging_group_by: PhysicalGroupBy,
}

/// Tracks if the aggregate should skip partial aggregations
///
/// See "partial aggregation" discussion on [`GroupedHashAggregateStream`]
struct SkipAggregationProbe {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @kazuyukitanimura -- I wonder if you have time to review this change to hash aggregate spilling as you originally contributed #7400

Context:

/// Number of processed input rows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Number of processed input rows
/// Number of processed input rows (updates during probing)

input_rows: usize,
Expand Down Expand Up @@ -204,7 +209,7 @@ impl SkipAggregationProbe {
/// of `x` and one accumulator for `SUM(y)`, specialized for the data
/// type of `y`.
///
/// # Description
/// # Discussion
///
/// [`group_values`] does not store any aggregate state inline. It only
/// assigns "group indices", one for each (distinct) group value. The
Expand All @@ -222,7 +227,25 @@ impl SkipAggregationProbe {
///
/// [`group_values`]: Self::group_values
///
/// # Spilling
/// # Partial Aggregate and multi-phase grouping
///
/// As described on [`Accumulator::state`], this operator is used in the context
/// "multi-phase" grouping when the mode is [`AggregateMode::Partial`].
///
/// An important optimization for multi-phase partial aggregation is to skip
/// partial aggregation when it is not effective enough to warrant the memory or
/// CPU cost, as is often the case for queries many distinct groups (high
/// cardinality group by). Memory is particularly important because each Partial
/// aggregator must store the intermediate state for each group.
///
/// If the ratio of the number of groups to the number of input rows exceeds a
/// threshold, and [`GroupsAccumulator::supports_convert_to_state`] is
/// supported, this operator will stop applying Partial aggregation and directly
/// pass the input rows to the next aggregation phase.
///
/// [`Accumulator::state`]: datafusion_expr::Accumulator::state
///
/// # Spilling (to disk)
///
/// The sizes of group values and accumulators can become large. Before that causes out of memory,
/// this hash aggregator outputs partial states early for partial aggregation or spills to local
Expand Down Expand Up @@ -344,7 +367,7 @@ pub(crate) struct GroupedHashAggregateStream {
group_values_soft_limit: Option<usize>,

/// Optional probe for skipping data aggregation, if supported by
/// current stream
/// current stream.
skip_aggregation_probe: Option<SkipAggregationProbe>,
}

Expand Down