From d52c2e60f4a5d8ea78f723b3fda01cc750e98ad5 Mon Sep 17 00:00:00 2001 From: KristofferC Date: Tue, 30 Sep 2025 13:58:14 +0200 Subject: [PATCH] add the option to disable the part of threading in `Arrow.Table` that leads to catastrophic negative scaling --- src/table.jl | 25 +++++++++++++++++-------- test/testtables.jl | 9 +++++++++ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/table.jl b/src/table.jl index fe9206b4..8b9c210c 100644 --- a/src/table.jl +++ b/src/table.jl @@ -289,10 +289,10 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0)) end """ - Arrow.Table(io::IO; convert::Bool=true) - Arrow.Table(file::String; convert::Bool=true) - Arrow.Table(bytes::Vector{UInt8}, pos=1, len=nothing; convert::Bool=true) - Arrow.Table(inputs::Vector; convert::Bool=true) + Arrow.Table(io::IO; convert::Bool=true, threaded::Bool=true) + Arrow.Table(file::String; convert::Bool=true, threaded::Bool=true) + Arrow.Table(bytes::Vector{UInt8}, pos=1, len=nothing; convert::Bool=true, threaded::Bool=true) + Arrow.Table(inputs::Vector; convert::Bool=true, threaded::Bool=true) Read an arrow formatted table, from: * `io`, bytes will be read all at once via `read(io)` @@ -311,6 +311,8 @@ sink function: e.g. `DataFrame(Arrow.Table(file))`, `SQLite.load!(db, "table", A Supports the `convert` keyword argument which controls whether certain arrow primitive types will be lazily converted to more friendly Julia defaults; by default, `convert=true`. + +Set `threaded=false` to disable parallel batch processing, which can avoid lock contention for tables with many small batches. """ struct Table <: Tables.AbstractColumns names::Vector{Symbol} @@ -465,7 +467,7 @@ Table(inputs::Vector; kw...) = Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...) # will detect whether we're reading a Table from a file or stream -function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) +function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, threaded::Bool=true) t = Table() sch = nothing dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary id => DictEncoding @@ -580,10 +582,17 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) elseif header isa Meta.RecordBatch anyrecordbatches = true @debug "parsing record batch message: compression = $(header.compression)" - @wkspawn begin + if threaded + @wkspawn begin + cols = collect( + VectorIterator(sch, $batch, dictencodingslockable, convert), + ) + put!(() -> put!(tsks, cols), sync, $(rbi)) + end + else cols = - collect(VectorIterator(sch, $batch, dictencodingslockable, convert)) - put!(() -> put!(tsks, cols), sync, $(rbi)) + collect(VectorIterator(sch, batch, dictencodingslockable, convert)) + put!(tsks, cols) end rbi += 1 else diff --git a/test/testtables.jl b/test/testtables.jl index 1ee54045..e4ddd551 100644 --- a/test/testtables.jl +++ b/test/testtables.jl @@ -310,6 +310,15 @@ function testtable(nm, t, writekw, readkw, extratests) @test all(isequal.(values(t), values(tt))) extratests !== nothing && extratests(tt) seekstart(io) + + # Explicitly unthreaded + io = Arrow.tobuffer(t; writekw...) + tt = Arrow.Table(io; threaded=false, readkw...) + @test length(tt) == length(t) + @test all(isequal.(values(t), values(tt))) + extratests !== nothing && extratests(tt) + seekstart(io) + str = Arrow.Stream(io; readkw...) tt = first(str) @test length(tt) == length(t)