Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
addressed comments v2.0
  • Loading branch information
brkyvz committed Apr 30, 2015
commit 3a5c177e247ddb44a38e4ee4211c57ec3cad58eb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
* [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]].
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sure you document the range of support allowed.

* @param cols the names of the columns to search frequent items in
* @param support The minimum frequency for an item to be considered `frequent`
* @param support The minimum frequency for an item to be considered `frequent` Should be greater
* than 1e-4.
* @return A Local DataFrame with the Array of frequent items for each column.
*/
def freqItems(cols: Seq[String], support: Double): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't forget to add java.util.List ones

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also make sure you add a test to the JavaDataFrameSuite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mention struct when #5802 is merged.

Expand All @@ -44,12 +45,39 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) {
* Finding frequent items for columns, possibly with false positives. Using the
* frequent element count algorithm described in
* [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]].
* Returns items more frequent than 1/1000'th of the time.
* Returns items more frequent than 1 percent.
*
* @param cols the names of the columns to search frequent items in
* @return A Local DataFrame with the Array of frequent items for each column.
*/
def freqItems(cols: Seq[String]): DataFrame = {
FrequentItems.singlePassFreqItems(df, cols, 0.001)
FrequentItems.singlePassFreqItems(df, cols, 0.01)
}

/**
* Finding frequent items for columns, possibly with false positives. Using the
* frequent element count algorithm described in
* [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]].
*
* @param cols the names of the columns to search frequent items in
* @param support The minimum frequency for an item to be considered `frequent` Should be greater
* than 1e-4.
* @return A Local DataFrame with the Array of frequent items for each column.
*/
def freqItems(cols: List[String], support: Double): DataFrame = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just use Seq here, since Python has helper functions that can convert List into Seq.

FrequentItems.singlePassFreqItems(df, cols, support)
}

/**
* Finding frequent items for columns, possibly with false positives. Using the
* frequent element count algorithm described in
* [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]].
* Returns items more frequent than 1 percent of the time.
*
* @param cols the names of the columns to search frequent items in
* @return A Local DataFrame with the Array of frequent items for each column.
*/
def freqItems(cols: List[String]): DataFrame = {
FrequentItems.singlePassFreqItems(df, cols, 0.01)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,15 @@ private[sql] object FrequentItems extends Logging {
*
* @param df The input DataFrame
* @param cols the names of the columns to search frequent items in
* @param support The minimum frequency for an item to be considered `frequent`
* @param support The minimum frequency for an item to be considered `frequent`. Should be greater
* than 1e-4.
* @return A Local DataFrame with the Array of frequent items for each column.
*/
private[sql] def singlePassFreqItems(
df: DataFrame,
cols: Seq[String],
support: Double): DataFrame = {
if (support < 1e-6) {
logWarning(s"The selected support ($support) is too small, and might cause memory problems.")
}
require(support >= 1e-4, s"support ($support) must be greater than 1e-4.")
val numCols = cols.length
// number of max items to keep counts for
val sizeOfMap = (1 / support).toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.TestData$;
import org.apache.spark.sql.*;
import org.apache.spark.sql.test.TestSQLContext;
import org.apache.spark.sql.test.TestSQLContext$;
import org.apache.spark.sql.types.*;
Expand All @@ -36,6 +33,7 @@
import scala.collection.mutable.Buffer;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -178,4 +176,13 @@ public void testCreateDataFrameFromJavaBeans() {
Assert.assertEquals(bean.getD().get(i), d.apply(i));
}
}

@Test
public void testFrequentItems() {
DataFrame df = context.table("testData2");
List<String> cols = Arrays.asList("a");
DataFrame results = df.stat().freqItems(JavaConversions.asScalaIterable(cols).toList(), 0.2);
System.out.println(results.collect()[0].getSeq(0));
Assert.assertTrue(results.collect()[0].getSeq(0).contains(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,31 @@

package org.apache.spark.sql

import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.types._
import org.scalatest.FunSuite
import org.scalatest.Matchers._

import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext.implicits._

class DataFrameStatSuite extends FunSuite {

val sqlCtx = TestSQLContext

test("Frequent Items") {
def toLetter(i: Int): String = (i + 96).toChar.toString
val rows = Array.tabulate(1000)(i => if (i % 3 == 0) (1, toLetter(1)) else (i, toLetter(i)))
val rowRdd = sqlCtx.sparkContext.parallelize(rows.map(v => Row(v._1, v._2)))
val schema = StructType(StructField("numbers", IntegerType, false) ::
StructField("letters", StringType, false) :: Nil)
val df = sqlCtx.createDataFrame(rowRdd, schema)
val rows = Array.tabulate(1000) { i =>
if (i % 3 == 0) (1, toLetter(1), -1.0) else (i, toLetter(i), i * -1.0)
}
val df = sqlCtx.sparkContext.parallelize(rows).toDF("numbers", "letters", "negDoubles")

val results = df.stat.freqItems(Array("numbers", "letters"), 0.1)
val items = results.collect().head
assert(items.getSeq(0).contains(1),
"1 should be the frequent item for column 'numbers")
assert(items.getSeq(1).contains(toLetter(1)),
s"${toLetter(1)} should be the frequent item for column 'letters'")
items.getSeq[Int](0) should contain (1)
items.getSeq[String](1) should contain (toLetter(1))

val singleColResults = df.stat.freqItems(Array("negDoubles"), 0.1)
val items2 = singleColResults.collect().head
items2.getSeq[Double](0) should contain (-1.0)

}
}