Skip to content
Next Next commit
Add OpenCloseIterator and refactor LocalNode to make it reusable
  • Loading branch information
zsxwing committed Aug 26, 2015
commit 4dc2583bbbbc22ed4751a1e89cfa4521d19f2fa2
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,28 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate

case class FilterNode(condition: Expression, child: LocalNode) extends UnaryLocalNode {

private[this] var predicate: (InternalRow) => Boolean = _

override def output: Seq[Attribute] = child.output

override def open(): Unit = {
child.open()
predicate = GeneratePredicate.generate(condition, child.output)
}
override def execute(): OpenCloseRowIterator = new OpenCloseRowIterator {

private val childIter = child.execute()

private val predicate = GeneratePredicate.generate(condition, child.output)

override def open(): Unit = childIter.open()

override def next(): Boolean = {
var found = false
while (child.next() && !found) {
found = predicate.apply(child.get())
override def close(): Unit = childIter.close()

override def getRow: InternalRow = childIter.getRow

override def advanceNext(): Boolean = {
var found = false
while (!found && childIter.advanceNext()) {
found = predicate.apply(childIter.getRow)
}
found
}
found
}

override def get(): InternalRow = child.get()
}

override def close(): Unit = child.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,40 +34,25 @@ abstract class LocalNode extends TreeNode[LocalNode] {
def output: Seq[Attribute]

/**
* Initializes the iterator state. Must be called before calling `next()`.
*
* Implementations of this must also call the `open()` function of its children.
* Returns the result of this query as an OpenCloseRowIterator.
*/
def open(): Unit
def execute(): OpenCloseRowIterator

/**
* Advances the iterator to the next tuple. Returns true if there is at least one more tuple.
*/
def next(): Boolean

/**
* Returns the current tuple.
*/
def get(): InternalRow

/**
* Closes the iterator and releases all resources.
*
* Implementations of this must also call the `close()` function of its children.
*/
def close(): Unit

/**
* Returns the content of the iterator from the beginning to the end in the form of a Scala Seq.
* Execute the query and collect all results in the form of a Scala Seq.
*/
def collect(): Seq[Row] = {
val converter = CatalystTypeConverters.createToScalaConverter(StructType.fromAttributes(output))
val result = new scala.collection.mutable.ArrayBuffer[Row]
open()
while (next()) {
result += converter.apply(get()).asInstanceOf[Row]
val iter = execute()
iter.open()
try {
while (iter.advanceNext()) {
result += converter.apply(iter.getRow).asInstanceOf[Row]
}
} finally {
iter.close()
}
close()
result
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.local

import org.apache.spark.sql.execution.RowIterator

/**
* A special RowIterator that has two extra methods: `open` and `close`, which we can use them to
* manage resources in this Iterator.
*/
private[sql] abstract class OpenCloseRowIterator extends RowIterator {

/**
* Initializes the iterator state. Must be called before using this Iterator.
*
* Implementations of this must also call the `open()` function of its children.
*/
def open(): Unit

/**
* Closes the iterator and releases all resources.
*
* Implementations of this must also call the `close()` function of its children.
*/
def close(): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@ import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, N

case class ProjectNode(projectList: Seq[NamedExpression], child: LocalNode) extends UnaryLocalNode {

private[this] var project: UnsafeProjection = _

override def output: Seq[Attribute] = projectList.map(_.toAttribute)

override def open(): Unit = {
project = UnsafeProjection.create(projectList, child.output)
child.open()
}
override def execute(): OpenCloseRowIterator = new OpenCloseRowIterator {

override def next(): Boolean = child.next()
private val project = UnsafeProjection.create(projectList, child.output)

override def get(): InternalRow = {
project.apply(child.get())
}
private val childIter = child.execute()

override def open(): Unit = childIter.open()

override def close(): Unit = child.close()
override def close(): Unit = childIter.close()

override def getRow: InternalRow = project.apply(childIter.getRow)

override def advanceNext(): Boolean = childIter.advanceNext()

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,30 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
*/
case class SeqScanNode(output: Seq[Attribute], data: Seq[InternalRow]) extends LeafLocalNode {

private[this] var iterator: Iterator[InternalRow] = _
private[this] var currentRow: InternalRow = _
override def execute(): OpenCloseRowIterator = new OpenCloseRowIterator {

override def open(): Unit = {
iterator = data.iterator
}
private var iter: Iterator[InternalRow] = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just assign value here(we can use val then)? Your changes to FilterNode and ProjectNode also assign value to predicate/project immediately instead of in the open.


private var currentRow: InternalRow = _

override def next(): Boolean = {
if (iterator.hasNext) {
currentRow = iterator.next()
true
} else {
false
override def open(): Unit = {
iter = data.iterator
}
}

override def get(): InternalRow = currentRow
override def close(): Unit = {
// Do nothing
}

override def close(): Unit = {
// Do nothing
override def getRow: InternalRow = currentRow

override def advanceNext(): Boolean = {
if (iter.hasNext) {
currentRow = iter.next()
true
} else {
false
}
}
}

}