Skip to content

Commit c011888

Browse files
authored
Fix poor performance of table reading when many record batches are involved (#570)
Fixes #528. Alternative to #568 cc: @KristofferC
1 parent c75d0e9 commit c011888

File tree

2 files changed

+61
-30
lines changed

2 files changed

+61
-30
lines changed

src/table.jl

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -470,28 +470,14 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
470470
sch = nothing
471471
dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary id => DictEncoding
472472
dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
473-
sync = OrderedSynchronizer()
474-
tsks = Channel{Any}(Inf)
475-
tsk = @wkspawn begin
476-
i = 1
477-
for cols in tsks
478-
if i == 1
479-
foreach(x -> push!(columns(t), x), cols)
480-
elseif i == 2
481-
foreach(1:length(cols)) do i
482-
columns(t)[i] = ChainedVector([columns(t)[i], cols[i]])
483-
end
484-
else
485-
foreach(1:length(cols)) do i
486-
append!(columns(t)[i], cols[i])
487-
end
488-
end
489-
i += 1
490-
end
491-
end
492-
anyrecordbatches = false
473+
# we'll grow/add a record batch set of columns as they're constructed
474+
# must be holding the lock while growing/adding
475+
# starts at 0-length because we don't know how many record batches there will be
476+
rb_cols = []
477+
rb_cols_lock = ReentrantLock()
493478
rbi = 1
494-
@sync for blob in blobs
479+
tasks = Task[]
480+
for blob in blobs
495481
for batch in BatchIterator(blob)
496482
# store custom_metadata of batch.msg?
497483
header = batch.msg.header
@@ -578,30 +564,49 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
578564
end # lock
579565
@debug "parsed dictionary batch message: id=$id, data=$values\n"
580566
elseif header isa Meta.RecordBatch
581-
anyrecordbatches = true
582567
@debug "parsing record batch message: compression = $(header.compression)"
583-
@wkspawn begin
584-
cols =
585-
collect(VectorIterator(sch, $batch, dictencodingslockable, convert))
586-
put!(() -> put!(tsks, cols), sync, $(rbi))
587-
end
568+
push!(
569+
tasks,
570+
collect_cols!(
571+
rbi,
572+
rb_cols_lock,
573+
rb_cols,
574+
sch,
575+
batch,
576+
dictencodingslockable,
577+
convert,
578+
),
579+
)
588580
rbi += 1
589581
else
590582
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
591583
end
592584
end
593585
end
594-
close(tsks)
595-
wait(tsk)
586+
_waitall(tasks)
596587
lu = lookup(t)
597588
ty = types(t)
598589
# 158; some implementations may send 0 record batches
599-
if !anyrecordbatches && !isnothing(sch)
590+
# no more multithreading, so no need to take the lock now
591+
if length(rb_cols) == 0 && !isnothing(sch)
600592
for field in sch.fields
601593
T = juliaeltype(field, buildmetadata(field), convert)
602594
push!(columns(t), T[])
603595
end
604596
end
597+
if length(rb_cols) > 0
598+
foreach(x -> push!(columns(t), x), rb_cols[1])
599+
end
600+
if length(rb_cols) > 1
601+
foreach(enumerate(rb_cols[2])) do (i, x)
602+
columns(t)[i] = ChainedVector([columns(t)[i], x])
603+
end
604+
foreach(3:length(rb_cols)) do j
605+
foreach(enumerate(rb_cols[j])) do (i, x)
606+
append!(columns(t)[i], x)
607+
end
608+
end
609+
end
605610
for (nm, col) in zip(names(t), columns(t))
606611
lu[nm] = col
607612
push!(ty, eltype(col))
@@ -610,6 +615,26 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
610615
return t
611616
end
612617

618+
function collect_cols!(
619+
rbi,
620+
rb_cols_lock,
621+
rb_cols,
622+
sch,
623+
batch,
624+
dictencodingslockable,
625+
convert,
626+
)
627+
@wkspawn begin
628+
cols = collect(VectorIterator(sch, batch, dictencodingslockable, convert))
629+
@lock rb_cols_lock begin
630+
if length(rb_cols) < rbi
631+
resize!(rb_cols, rbi)
632+
end
633+
rb_cols[rbi] = cols
634+
end
635+
end
636+
end
637+
613638
function getdictionaries!(dictencoded, field)
614639
d = field.dictionary
615640
if d !== nothing

src/utils.jl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ function writezeros(io::IO, n::Integer)
2828
s
2929
end
3030

31+
if isdefined(Base, :waitall)
32+
const _waitall = waitall
33+
else
34+
_waitall(tasks) = foreach(wait, tasks)
35+
end
36+
3137
# efficient writing of arrays
3238
writearray(io, col) = writearray(io, maybemissing(eltype(col)), col)
3339

0 commit comments

Comments
 (0)