Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Modify to close resources also immediately after row iterator is cons…
…umed.
  • Loading branch information
ueshin committed Jul 20, 2017
commit 7084b388d87c8347b79898827658d7827bf5649d
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,23 @@ private[sql] object ArrowConverters {
val root = VectorSchemaRoot.create(arrowSchema, allocator)
val arrowWriter = ArrowWriter.create(root)

var closed = false

context.addTaskCompletionListener { _ =>
root.close()
allocator.close()
if (!closed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this? I think it's ok to close twice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allocator can be closed twice, but the root throws an exception after allocator is closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a bug in arrow? cc @BryanCutler

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root just releases the buffers from the FieldVectors, so I would think it should be able to handle being closed twice. I'll check tomorrow if seems reasonable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed https://issues.apache.org/jira/browse/ARROW-1283 to fix this. For now, it looks like we need this.

root.close()
allocator.close()
}
}

new Iterator[ArrowPayload] {

override def hasNext: Boolean = rowIter.hasNext
override def hasNext: Boolean = rowIter.hasNext || {
root.close()
allocator.close()
closed = true
false
}

override def next(): ArrowPayload = {
val out = new ByteArrayOutputStream()
Expand Down