Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
merge master
  • Loading branch information
realno committed Jan 11, 2022
commit f9c5bace415ac144ba0c9a4256883a295b356e41
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/expressions/stddev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::physical_plan::{
use crate::scalar::ScalarValue;
use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field};


use super::{format_state_name, StatsType};

/// STDDEV and STDDEV_SAMP (standard deviation) aggregate expression
Expand Down
115 changes: 107 additions & 8 deletions datafusion/src/physical_plan/expressions/variance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,6 @@ impl Accumulator for VarianceAccumulator {
])
}

fn update(&mut self, _values: &[ScalarValue]) -> Result<()> {
Ok(())
}

fn merge(&mut self, _states: &[ScalarValue]) -> Result<()> {
Ok(())
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &cast(&values[0], &DataType::Float64)?;
let arr = values.as_any().downcast_ref::<Float64Array>().unwrap();
Expand Down Expand Up @@ -306,6 +298,113 @@ impl Accumulator for VarianceAccumulator {
self.mean = new_mean;
self.m2 = new_m2;
}
Ok(())
}

fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
let values = &values[0];
let is_empty = values.is_null();
let mean = ScalarValue::from(self.mean);
let m2 = ScalarValue::from(self.m2);

if !is_empty {
let new_count = self.count + 1;
let delta1 = ScalarValue::add(values, &mean.arithmetic_negate())?;
let new_mean = ScalarValue::add(
&ScalarValue::div(&delta1, &ScalarValue::from(new_count as f64))?,
&mean,
)?;
let delta2 = ScalarValue::add(values, &new_mean.arithmetic_negate())?;
let tmp = ScalarValue::mul(&delta1, &delta2)?;

let new_m2 = ScalarValue::add(&m2, &tmp)?;
self.count += 1;

if let ScalarValue::Float64(Some(c)) = new_mean {
self.mean = c;
} else {
unreachable!()
};
if let ScalarValue::Float64(Some(m)) = new_m2 {
self.m2 = m;
}else {
unreachable!()
};
}

Ok(())
}

fn merge(&mut self, states: &[ScalarValue]) -> Result<()> {
let count;
let mean;
let m2;
let mut new_count: u64 = self.count;

if let ScalarValue::UInt64(Some(c)) = states[0] {
count = c;
} else {
unreachable!()
};

if count == 0_u64 {
return Ok(());
}

if let ScalarValue::Float64(Some(m)) = states[1] {
mean = m;
} else {
unreachable!()
};
if let ScalarValue::Float64(Some(n)) = states[2] {
m2 = n;
}else {
unreachable!()
};

if self.count == 0 {
self.count = count;
self.mean = mean;
self.m2 = m2;
return Ok(());
}

new_count += count;

let mean1 = ScalarValue::from(self.mean);
let mean2 = ScalarValue::from(mean);

let new_mean = ScalarValue::div(
&ScalarValue::add(&mean1, &mean2)?,
&ScalarValue::from(2_f64),
)?;
let delta = ScalarValue::add(&mean2.arithmetic_negate(), &mean1)?;
let delta_sqrt = ScalarValue::mul(&delta, &delta)?;
let new_m2 = ScalarValue::add(
&ScalarValue::add(
&ScalarValue::mul(
&delta_sqrt,
&ScalarValue::div(
&ScalarValue::mul(&ScalarValue::from(self.count), &ScalarValue::from(count))?,
&ScalarValue::from(new_count as f64),
)?,
)?,
&ScalarValue::from(self.m2),
)?,
&ScalarValue::from(m2),
)?;

self.count = new_count;
if let ScalarValue::Float64(Some(c)) = new_mean {
self.mean = c;
} else {
unreachable!()
};
if let ScalarValue::Float64(Some(m)) = new_m2 {
self.m2 = m;
}else {
unreachable!()
};

Ok(())
}
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.