diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 8c23d25..9e60705 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -8,7 +8,6 @@ variables: .global_settings: &global_settings tags: - - artenolis - slave01 .global_testing: &global_testing @@ -19,27 +18,17 @@ variables: script: - Invoke-Expression $Env:ARTENOLIS_SOFT_PATH"\julia\"$Env:JULIA_VER"\bin\julia --color=yes --project=@. -e 'import Pkg; Pkg.test(; coverage = true)'" -linux julia v1.5: +linux julia v1.6: stage: build variables: - JULIA_VER: "v1.5.3" + JULIA_VER: "v1.6.0" <<: *global_settings <<: *global_testing windows10: stage: build tags: - - artenolis - windows10 variables: - JULIA_VER: "v1.5.3" + JULIA_VER: "v1.6.0" <<: *global_testing_win - -windows8: - stage: build - tags: - - artenolis - - windows8 - variables: - JULIA_VER: "v1.5.3" - <<: *global_testing_win \ No newline at end of file diff --git a/Project.toml b/Project.toml index d28068f..fad4f91 100644 --- a/Project.toml +++ b/Project.toml @@ -2,7 +2,7 @@ name = "DistributedData" uuid = "f6a0035f-c5ac-4ad0-b410-ad102ced35df" authors = ["Mirek Kratochvil ", "LCSB R3 team "] -version = "0.1.3" +version = "0.2.0" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" diff --git a/docs/make.jl b/docs/make.jl index 3ad9663..210521a 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -1,10 +1,13 @@ using Documenter, DistributedData -makedocs(modules = [DistributedData], +makedocs( + modules = [DistributedData], clean = false, - format = Documenter.HTML(prettyurls = !("local" in ARGS), - canonical = "https://lcsb-biocore.github.io/DistributedData.jl/stable/", - assets = ["assets/logo.ico"]), + format = Documenter.HTML( + prettyurls = !("local" in ARGS), + canonical = "https://lcsb-biocore.github.io/DistributedData.jl/stable/", + assets = ["assets/logo.ico"], + ), sitename = "DistributedData.jl", authors = "The developers of DistributedData.jl", linkcheck = !("skiplinks" in ARGS), @@ -20,5 +23,5 @@ deploydocs( target = "build", branch = "gh-pages", devbranch = "develop", - push_preview = true + push_preview = true, ) diff --git a/docs/slurm-example/run-analysis.jl b/docs/slurm-example/run-analysis.jl index 80a3564..fcda404 100644 --- a/docs/slurm-example/run-analysis.jl +++ b/docs/slurm-example/run-analysis.jl @@ -1,14 +1,14 @@ -using Distributed, ClusterManagers, DistributedData +using Distributed, ClusterManagers, DistributedData # read the number of available workers from environment and start the worker processes -n_workers = parse(Int , ENV["SLURM_NTASKS"]) -addprocs_slurm(n_workers , topology =:master_worker) +n_workers = parse(Int, ENV["SLURM_NTASKS"]) +addprocs_slurm(n_workers, topology = :master_worker) # load the required packages on all workers @everywhere using DistributedData # generate a random dataset on all workers -dataset = dtransform((), _ -> randn(10000,10000), workers(), :myData) +dataset = dtransform((), _ -> randn(10000, 10000), workers(), :myData) # for demonstration, sum the whole dataset totalResult = dmapreduce(dataset, sum, +) diff --git a/docs/src/tutorial.md b/docs/src/tutorial.md index b59a1dc..1c9ada8 100644 --- a/docs/src/tutorial.md +++ b/docs/src/tutorial.md @@ -153,8 +153,8 @@ run asynchronously, they are processed concurrently, thus faster. To illustrate the difference, the following code distributes some random data and then synchronizes correctly, but is essentially serial: ```julia -julia> @time for i in workers() - fetch(save_at(i, :x, :(randn(10000,10000)))) +julia> @time for w in workers() + fetch(save_at(w, :x, :(randn(10000,10000)))) end 1.073267 seconds (346 allocations: 12.391 KiB) ``` @@ -164,7 +164,8 @@ make the code parallel, and usually a few times faster (depending on the number of workers): ```julia -julia> @time fetch.([save_at(i, :x, :(randn(10000,10000))) for i in workers()]) +julia> @time map(fetch, [save_at(w, :x, :(randn(10000,10000))) + for w in workers()]) 0.403235 seconds (44.50 k allocations: 2.277 MiB) 3-element Array{Nothing,1}: nothing @@ -175,8 +176,8 @@ The same is applicable for retrieving the sub-results in parallel. This example demonstrates that multiple workers can do some work at the same time: ```julia -julia> @time fetch.([get_from(i, :(begin sleep(1); myid(); end)) - for i in workers()]) +julia> @time map(fetch, [get_from(i, :(begin sleep(1); myid(); end)) + for i in workers()]) 1.027651 seconds (42.26 k allocations: 2.160 MiB) 3-element Array{Int64,1}: 2 @@ -211,7 +212,8 @@ of individual workers. The storage of the variables is otherwise same as with the basic data-moving function -- you can e.g. manually check the size of the resulting slices on each worker using `get_from`: ```julia -julia> fetch.([get_from(w, :(size($(dataset.val)))) for w in dataset.workers]) +julia> map(fetch, [get_from(w, :(size($(dataset.val)))) + for w in dataset.workers]) 3-element Array{Tuple{Int64,Int64},1}: (333, 3) (333, 3) diff --git a/src/DistributedData.jl b/src/DistributedData.jl index 91a54db..f00fbba 100644 --- a/src/DistributedData.jl +++ b/src/DistributedData.jl @@ -19,7 +19,8 @@ export save_at, dmap, dpmap, gather_array, - tmp_symbol + tmp_symbol, + @remote include("io.jl") export dstore, dload, dunlink diff --git a/src/base.jl b/src/base.jl index 1c705f3..055c425 100644 --- a/src/base.jl +++ b/src/base.jl @@ -1,3 +1,4 @@ + """ save_at(worker, sym, val) @@ -46,7 +47,7 @@ Get a value `val` from a remote `worker`; quoting of `val` works just as with `save_at`. Returns a future with the requested value. """ function get_from(worker, val; mod = Main) - remotecall(() -> Base.eval(mod, :($val)), worker) + remotecall(() -> Base.eval(mod, val), worker) end """ @@ -68,28 +69,25 @@ function remove_from(worker, sym::Symbol) end """ - scatter_array(sym, x::Array, pids; dim=1)::Dinfo + scatter_array(sym, x::Array, workers; dim=1)::Dinfo Distribute roughly equal parts of array `x` separated on dimension `dim` among -`pids` into a worker-local variable `sym`. +`workers` into a worker-local variable `sym`. Returns the `Dinfo` structure for the distributed data. """ -function scatter_array(sym::Symbol, x::Array, pids; dim = 1)::Dinfo - n = length(pids) +function scatter_array(sym::Symbol, x::Array, workers; dim = 1)::Dinfo + n = length(workers) dims = size(x) - for f in [ - begin - extent = [(1:s) for s in dims] - extent[dim] = (1+div((wid - 1) * dims[dim], n)):div(wid * dims[dim], n) - save_at(pid, sym, x[extent...]) - end for (wid, pid) in enumerate(pids) - ] - fetch(f) + asyncmap(enumerate(workers)) do (i, pid) + extent = [(1:s) for s in dims] + extent[dim] = (1+div((i - 1) * dims[dim], n)):div(i * dims[dim], n) + wait(save_at(pid, sym, x[extent...])) + nothing end - return Dinfo(sym, pids) + return Dinfo(sym, workers) end """ @@ -98,8 +96,8 @@ end Remove the loaded data from workers. """ function unscatter(sym::Symbol, workers) - for f in [remove_from(pid, sym) for pid in workers] - fetch(f) + asyncmap(workers) do pid + wait(remove_from(pid, sym)) end end @@ -120,14 +118,15 @@ collected. This is optimal for various side-effect-causing computations that are not easily expressible with `dtransform`. """ function dexec(val, fn, workers) - for f in [get_from(pid, :( - begin - $fn($val) - nothing - end - )) for pid in workers] - fetch(f) + asyncmap(workers) do pid + wait(get_from(pid, :( + begin + $fn($val) + nothing + end + ))) end + nothing end """ @@ -151,8 +150,8 @@ in-place, by a function `fn`. Store the result as `tgt` (default `val`) dtransform(:myData, (d)->(2*d), workers()) """ function dtransform(val, fn, workers, tgt::Symbol = val)::Dinfo - for f in [save_at(pid, tgt, :($fn($val))) for pid in workers] - fetch(f) + asyncmap(workers) do pid + wait(save_at(pid, tgt, :($fn($val)))) end return Dinfo(tgt, workers) end @@ -167,7 +166,7 @@ function dtransform(dInfo::Dinfo, fn, tgt::Symbol = dInfo.val)::Dinfo end """ - dmapreduce(val, map, fold, workers) + dmapreduce(val, map, fold, workers; prefetch = :all) A distributed work-alike of the standard `mapreduce`: Take a function `map` (a non-modifying transform on the data) and `fold` (2-to-1 reduction of the @@ -178,8 +177,10 @@ It is assumed that the fold operation is associative, but not commutative (as in semigroups). If there are no workers, operation returns `nothing` (we don't have a monoid to magically conjure zero elements :[ ). -In current version, the reduce step is a sequential left fold, executed in the -main process. +In the current version, the reduce step is a sequential left fold, executed in +the main process. Parameter `prefetch` says how many futures should be +`fetch`ed in advance; increasing prefetch improves the throughput but increases +memory usage in case the results of `map` are big. # Example # compute the mean of all distributed data @@ -200,22 +201,39 @@ example, distributed values `:a` and `:b` can be joined as such: vcat, workers()) """ -function dmapreduce(val, map, fold, workers) - if isempty(workers) - return nothing +function dmapreduce(val, map, fold, workers; prefetch = :all) + if prefetch == :all + prefetch = length(workers) end - futures = [get_from(pid, :($map($val))) for pid in workers] - res = fetch(futures[1]) + futures = asyncmap(workers) do pid + get_from(pid, :($map($val))) + end - # replace the collected futures with new empty futures to allow them to be - # GC'd and free memory for more incoming results - futures[1] = Future() + res = nothing + prefetched = 0 + + @sync for i in eachindex(futures) + # start fetching a few futures in advance + while prefetched < min(i + prefetch, length(futures)) + prefetched += 1 + # dodge deadlock + if workers[prefetched] != myid() + @async fetch(futures[$prefetched]) + end + end - for i = 2:length(futures) - res = fold(res, fetch(futures[i])) + if i == 1 + # nothing to fold yet + res = fetch(futures[i]) + else + res = fold(res, fetch(futures[i])) + end + # replace the collected future with an empty structure so that the data + # can be GC'd, freeing memory for more incoming results futures[i] = Future() end + res end @@ -274,18 +292,16 @@ This preallocates the array for results, and is thus more efficient than e.g. using `dmapreduce` with `vcat` for folding. """ function gather_array(val::Symbol, workers, dim = 1; free = false) - size0 = get_val_from(workers[1], :(size($val))) - innerType = get_val_from(workers[1], :(typeof($val).parameters[1])) + (size0, innerType) = get_val_from(workers[1], :((size($val), eltype($val)))) sizes = dmapreduce(val, d -> size(d, dim), vcat, workers) ressize = [size0[i] for i = 1:length(size0)] ressize[dim] = sum(sizes) + offs = [0; cumsum(sizes)] result = zeros(innerType, ressize...) - off = 0 - for (i, pid) in enumerate(workers) + asyncmap(enumerate(workers)) do (i, pid) idx = [(1:ressize[j]) for j = 1:length(ressize)] - idx[dim] = ((off+1):(off+sizes[i])) + idx[dim] = (offs[i]+1):(offs[i+1]) result[idx...] = get_val_from(pid, val) - off += sizes[i] end if free unscatter(val, workers) @@ -310,7 +326,9 @@ Call a function `fn` on `workers`, with a single parameter arriving from the corresponding position in `arr`. """ function dmap(arr::Vector, fn, workers) - fetch.([get_from(w, :($fn($(arr[i])))) for (i, w) in enumerate(workers)]) + asyncmap(enumerate(workers)) do (i, w) + get_val_from(w, :($fn($(arr[i])))) + end end """ @@ -358,3 +376,41 @@ Decorate the symbol from `dInfo` with prefix and suffix. function tmp_symbol(dInfo::Dinfo; prefix = "", suffix = "_tmp") return tmp_symbol(dInfo.val, prefix = prefix, suffix = suffix) end + +""" + @remote module expr + +A version of [`@remote`](@ref) that adds additional choice of the module for +scope. +""" +macro remote(mod, x) + :(Base.eval($mod, $(QuoteNode(x)))) +end + +""" + @remote expr + +In a function that will get evaluated on a remote worker, this ensures the +evaluation scope of the expression `expr` (usually a variable) is taken on +the remote side, preventing namespace clash with the local session. + +This is mainly useful for making the functions from `Distributed` package (such +as `pmap` and `remotecall`) work with the data stored by `DistributedData` +package. + +Internally, this is handled by wrapping in `eval`. + +# Example +``` +julia> save_at(2, :x, 321) +Future(2, 1, 162, nothing) + +julia> let x=123 + remotecall_fetch(() -> x + (@remote x), 2) + end +444 +``` +""" +macro remote(x) + :(@remote Main $x) +end diff --git a/test/base.jl b/test/base.jl index f171d9a..e45dbc1 100644 --- a/test/base.jl +++ b/test/base.jl @@ -63,6 +63,16 @@ sum(orig .^ 2), ) + @test isapprox( + dmapreduce(:test, d -> sum(d .^ 2), (a, b) -> a + b, W), + dmapreduce(:test, d -> sum(d .^ 2), (a, b) -> a + b, W; prefetch = 0), + ) + + @test isapprox( + dmapreduce(:test, d -> sum(d .^ 2), (a, b) -> a + b, W), + dmapreduce(:test, d -> sum(d .^ 2), (a, b) -> a + b, W; prefetch = 2), + ) + dtransform(di, d -> d .* 2) @test orig .* 2 == gather_array(:test, W) @@ -86,14 +96,14 @@ end @testset "`pmap` on distributed data" begin - fetch.(save_at.(W, :test, 1234321)) + map(fetch, save_at.(W, :test, 1234321)) di = Dinfo(:test, W) # also test the example in docs @test dpmap( x -> :($(di.val) + $x), WorkerPool(di.workers), [4321234, 1234, 4321], ) == [5555555, 1235555, 1238642] - fetch.(remove_from.(W, :test)) + map(fetch, remove_from.(W, :test)) end @testset "Internal utilities" begin @@ -129,6 +139,16 @@ @test all([!isfile(f) for f in files]) end + @testset "@remote macro" begin + di = dtransform(:(), _ -> myid(), W, :test) + + test = 333 + + for pid in W + @test remotecall_fetch(() -> test + (@remote test), pid) == test + pid + end + end + rmprocs(W) W = nothing