diff --git a/src/table.jl b/src/table.jl index fe9206b..8b9c210 100644 --- a/src/table.jl +++ b/src/table.jl @@ -289,10 +289,10 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0)) end """ - Arrow.Table(io::IO; convert::Bool=true) - Arrow.Table(file::String; convert::Bool=true) - Arrow.Table(bytes::Vector{UInt8}, pos=1, len=nothing; convert::Bool=true) - Arrow.Table(inputs::Vector; convert::Bool=true) + Arrow.Table(io::IO; convert::Bool=true, threaded::Bool=true) + Arrow.Table(file::String; convert::Bool=true, threaded::Bool=true) + Arrow.Table(bytes::Vector{UInt8}, pos=1, len=nothing; convert::Bool=true, threaded::Bool=true) + Arrow.Table(inputs::Vector; convert::Bool=true, threaded::Bool=true) Read an arrow formatted table, from: * `io`, bytes will be read all at once via `read(io)` @@ -311,6 +311,8 @@ sink function: e.g. `DataFrame(Arrow.Table(file))`, `SQLite.load!(db, "table", A Supports the `convert` keyword argument which controls whether certain arrow primitive types will be lazily converted to more friendly Julia defaults; by default, `convert=true`. + +Set `threaded=false` to disable parallel batch processing, which can avoid lock contention for tables with many small batches. """ struct Table <: Tables.AbstractColumns names::Vector{Symbol} @@ -465,7 +467,7 @@ Table(inputs::Vector; kw...) = Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...) # will detect whether we're reading a Table from a file or stream -function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) +function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, threaded::Bool=true) t = Table() sch = nothing dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary id => DictEncoding @@ -580,10 +582,17 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) elseif header isa Meta.RecordBatch anyrecordbatches = true @debug "parsing record batch message: compression = $(header.compression)" - @wkspawn begin + if threaded + @wkspawn begin + cols = collect( + VectorIterator(sch, $batch, dictencodingslockable, convert), + ) + put!(() -> put!(tsks, cols), sync, $(rbi)) + end + else cols = - collect(VectorIterator(sch, $batch, dictencodingslockable, convert)) - put!(() -> put!(tsks, cols), sync, $(rbi)) + collect(VectorIterator(sch, batch, dictencodingslockable, convert)) + put!(tsks, cols) end rbi += 1 else diff --git a/test/testtables.jl b/test/testtables.jl index 1ee5404..e4ddd55 100644 --- a/test/testtables.jl +++ b/test/testtables.jl @@ -310,6 +310,15 @@ function testtable(nm, t, writekw, readkw, extratests) @test all(isequal.(values(t), values(tt))) extratests !== nothing && extratests(tt) seekstart(io) + + # Explicitly unthreaded + io = Arrow.tobuffer(t; writekw...) + tt = Arrow.Table(io; threaded=false, readkw...) + @test length(tt) == length(t) + @test all(isequal.(values(t), values(tt))) + extratests !== nothing && extratests(tt) + seekstart(io) + str = Arrow.Stream(io; readkw...) tt = first(str) @test length(tt) == length(t)