diff --git a/Project.toml b/Project.toml index 5e24490..d28068f 100644 --- a/Project.toml +++ b/Project.toml @@ -2,7 +2,7 @@ name = "DistributedData" uuid = "f6a0035f-c5ac-4ad0-b410-ad102ced35df" authors = ["Mirek Kratochvil ", "LCSB R3 team "] -version = "0.1.2" +version = "0.1.3" [deps] Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" diff --git a/README.md b/README.md index 61a235d..70a8714 100644 --- a/README.md +++ b/README.md @@ -7,15 +7,26 @@ Simple distributed data manipulation and processing routines for Julia. + +#### Acknowledgements + This was originally developed for -[`GigaSOM.jl`](https://github.com/LCSB-BioCore/GigaSOM.jl); DistributedData.jl package -contains the separated-out lightweight distributed-processing framework that -was used in `GigaSOM.jl`. +[`GigaSOM.jl`](https://github.com/LCSB-BioCore/GigaSOM.jl); +`DistributedData.jl` package contains the separated-out lightweight +distributed-processing framework that was used in `GigaSOM.jl`. + +`DistributedData.jl` was developed at the +Luxembourg Centre for Systems Biomedicine of the University of Luxembourg ([uni.lu/lcsb](https://www.uni.lu/lcsb)). +The development was supported by +European Union ELIXIR Staff Exchange programme 2020 ([elixir-europe.org](https://elixir-europe.org/)), and +European Union's Horizon 2020 Programme under PerMedCoE project ([permedcoe.eu](https://www.permedcoe.eu/)) agreement no. 951773. + +Uni.lu logo   LCSB logo   ELIXIR logo   PerMedCoE logo ## Why? -DistributedData.jl provides a very simple, imperative and straightforward way to move your -data around a cluster of Julia processes created by the +`DistributedData.jl` provides a very simple, imperative and straightforward way +to move your data around a cluster of Julia processes created by the [`Distributed`](https://docs.julialang.org/en/v1/stdlib/Distributed/) package, and run computation on the distributed data pieces. The main aim of the package is to avoid anything complicated-- the first version used in @@ -133,7 +144,7 @@ julia> gather_array(dataset) # download the data from workers to a sing ⋮ ``` -## Using DistributedData.jl in HPC environments +## Using `DistributedData.jl` in HPC environments You can use [`ClusterManagers`](https://github.com/JuliaParallel/ClusterManagers.jl) diff --git a/docs/src/assets/elixir.svg b/docs/src/assets/elixir.svg new file mode 100644 index 0000000..0a5cb19 --- /dev/null +++ b/docs/src/assets/elixir.svg @@ -0,0 +1,66 @@ + +image/svg+xml diff --git a/docs/src/assets/lcsb.svg b/docs/src/assets/lcsb.svg new file mode 100644 index 0000000..01a3552 --- /dev/null +++ b/docs/src/assets/lcsb.svg @@ -0,0 +1,143 @@ + +image/svg+xml diff --git a/docs/src/assets/permedcoe.svg b/docs/src/assets/permedcoe.svg new file mode 100644 index 0000000..fe6c07d --- /dev/null +++ b/docs/src/assets/permedcoe.svg @@ -0,0 +1,144 @@ + + + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/src/assets/unilu.svg b/docs/src/assets/unilu.svg new file mode 100644 index 0000000..1e63bb5 --- /dev/null +++ b/docs/src/assets/unilu.svg @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/src/DistributedData.jl b/src/DistributedData.jl index d339534..91a54db 100644 --- a/src/DistributedData.jl +++ b/src/DistributedData.jl @@ -17,13 +17,12 @@ export save_at, dtransform, dmapreduce, dmap, + dpmap, gather_array, tmp_symbol include("io.jl") -export dstore, - dload, - dunlink +export dstore, dload, dunlink include("tools.jl") export dcopy, diff --git a/src/base.jl b/src/base.jl index 8c7bdf4..1c705f3 100644 --- a/src/base.jl +++ b/src/base.jl @@ -30,7 +30,7 @@ The symbols are saved in Main module on the corresponding worker. For example, `save_at(1, :x, nothing)` _will_ erase your local `x` variable. Beware of name collisions. """ -function save_at(worker, sym::Symbol, val; mod=Main) +function save_at(worker, sym::Symbol, val; mod = Main) remotecall(() -> Base.eval(mod, :( begin $sym = $val @@ -45,7 +45,7 @@ end Get a value `val` from a remote `worker`; quoting of `val` works just as with `save_at`. Returns a future with the requested value. """ -function get_from(worker, val; mod=Main) +function get_from(worker, val; mod = Main) remotecall(() -> Base.eval(mod, :($val)), worker) end @@ -162,11 +162,7 @@ end Same as `dtransform`, but specialized for `Dinfo`. """ -function dtransform( - dInfo::Dinfo, - fn, - tgt::Symbol = dInfo.val, -)::Dinfo +function dtransform(dInfo::Dinfo, fn, tgt::Symbol = dInfo.val)::Dinfo dtransform(dInfo.val, fn, dInfo.workers, tgt) end @@ -314,11 +310,34 @@ Call a function `fn` on `workers`, with a single parameter arriving from the corresponding position in `arr`. """ function dmap(arr::Vector, fn, workers) - futures = [ - remotecall(() -> Base.eval(Main, :($fn($(arr[i])))), pid) #TODO convert to get_from - for (i, pid) in enumerate(workers) - ] - return [fetch(f) for f in futures] + fetch.([get_from(w, :($fn($(arr[i])))) for (i, w) in enumerate(workers)]) +end + +""" + dpmap(fn, args...; mod = Main, kwargs...) + +"Distributed pool map." + +A wrapper for `pmap` from `Distributed` package that executes the code in the +correct module, so that it can access the distributed variables at remote +workers. All arguments other than the first function `fn` are passed to `pmap`. + +The function `fn` should return an expression that is going to get evaluated. + +# Example + +```julia +using Distributed +dpmap(x -> :(computeSomething(someData, \$x)), WorkerPool(workers), Vector(1:10)) +``` + +```julia +di = distributeSomeData() +dpmap(x -> :(computeSomething(\$(di.val), \$x)), CachingPool(di.workers), Vector(1:10)) +``` +""" +function dpmap(fn, args...; mod = Main, kwargs...) + return pmap(x -> Base.eval(mod, fn(x)), args...; kwargs...) end """ diff --git a/src/io.jl b/src/io.jl index 8e7a6a2..69527b9 100644 --- a/src/io.jl +++ b/src/io.jl @@ -33,10 +33,7 @@ end Overloaded functionality for `Dinfo`. """ -function dstore( - dInfo::Dinfo, - files = defaultFiles(dInfo.val, dInfo.workers), -) +function dstore(dInfo::Dinfo, files = defaultFiles(dInfo.val, dInfo.workers)) dstore(dInfo.val, dInfo.workers, files) end @@ -47,16 +44,12 @@ Import the content of symbol `sym` by each worker specified by `pids` from the corresponding filename in `files`. """ function dload(sym::Symbol, pids, files = defaultFiles(sym, pids)) - dmap( - files, - (fn) -> Base.eval(Main, :( - begin - $sym = open($deserialize, $fn) - nothing - end - )), - pids, - ) + dmap(files, (fn) -> Base.eval(Main, :( + begin + $sym = open($deserialize, $fn) + nothing + end + )), pids) return Dinfo(sym, pids) end @@ -65,10 +58,7 @@ end Overloaded functionality for `Dinfo`. """ -function dload( - dInfo::Dinfo, - files = defaultFiles(dInfo.val, dInfo.workers), -) +function dload(dInfo::Dinfo, files = defaultFiles(dInfo.val, dInfo.workers)) dload(dInfo.val, dInfo.workers, files) end @@ -87,9 +77,6 @@ end Overloaded functionality for `Dinfo`. """ -function dunlink( - dInfo::Dinfo, - files = defaultFiles(dInfo.val, dInfo.workers), -) +function dunlink(dInfo::Dinfo, files = defaultFiles(dInfo.val, dInfo.workers)) dunlink(dInfo.val, dInfo.workers, files) end diff --git a/src/tools.jl b/src/tools.jl index 7afafc8..f2dfca8 100644 --- a/src/tools.jl +++ b/src/tools.jl @@ -13,11 +13,7 @@ end Reduce dataset to selected columns, optionally save it under a different name. """ -function dselect( - dInfo::Dinfo, - columns::Vector{Int}, - tgt::Symbol = dInfo.val, -)::Dinfo +function dselect(dInfo::Dinfo, columns::Vector{Int}, tgt::Symbol = dInfo.val)::Dinfo dtransform(dInfo, mtx -> mtx[:, columns], tgt) end @@ -91,10 +87,7 @@ end Compute mean and standard deviation of the columns in dataset. Returns a tuple with a vector of means in `columns`, and a vector of corresponding sdevs. """ -function dstat( - dInfo::Dinfo, - columns::Vector{Int}, -)::Tuple{Vector{Float64},Vector{Float64}} +function dstat(dInfo::Dinfo, columns::Vector{Int})::Tuple{Vector{Float64},Vector{Float64}} sum_squares = x -> sum(x .^ 2) @@ -136,8 +129,7 @@ function dstat_buckets( ) # extract the bucketed stats - (sums, sqsums, ns) = - dmapreduce([dInfo, buckets], get_bucketed_stats, combine_stats) + (sums, sqsums, ns) = dmapreduce([dInfo, buckets], get_bucketed_stats, combine_stats) return ( sums ./ ns, #means @@ -285,7 +277,8 @@ less or higher than `targets`. """ function update_extrema(counts, targets, lims, mids) broadcast( - (cnt, target, lim, mid) -> cnt >= target ? # if the count is too high, + (cnt, target, lim, mid) -> + cnt >= target ? # if the count is too high, (lim[1], mid) : # median is going to be in the lower half (mid, lim[2]), # otherwise in the higher half counts, @@ -313,11 +306,8 @@ function dmedian(dInfo::Dinfo, columns::Vector{Int}; iters = 20) target = dmapreduce(dInfo, d -> size(d, 1), +) ./ 2 # current estimation range for the median (tuples of min, max) - lims = dmapreduce( - dInfo, - d -> mapslices(extrema, d[:, columns], dims = 1), - reduce_extrema, - ) + lims = + dmapreduce(dInfo, d -> mapslices(extrema, d[:, columns], dims = 1), reduce_extrema) # convert the limits to a simple vector lims = cat(lims..., dims = 1) @@ -368,8 +358,8 @@ function dmedian_buckets( get_bucket_extrema = (d, b) -> catmapbuckets( (_, x) -> length(x) > 0 ? # if there are some elements - extrema(x) : # just take the extrema - (Inf, -Inf), # if not, use backup values + extrema(x) : # just take the extrema + (Inf, -Inf), # if not, use backup values d[:, columns], nbuckets, b, @@ -384,21 +374,22 @@ function dmedian_buckets( # this counts the elements smaller than mids in buckets # (both mids and elements are bucketed and column-sliced into matrices) bucketed_count_smaller_than_mids = - (d, b) -> vcat(mapbuckets( - (bucketID, d) -> - [ - count(x -> x < mids[bucketID, colID], d[:, colID]) - for (colID, c) in enumerate(columns) - ]', - d, - nbuckets, - b, - slicedims = (1, 2), - )...) + (d, b) -> vcat( + mapbuckets( + (bucketID, d) -> + [ + count(x -> x < mids[bucketID, colID], d[:, colID]) for + (colID, c) in enumerate(columns) + ]', + d, + nbuckets, + b, + slicedims = (1, 2), + )..., + ) # gather the counts - counts = - dmapreduce([dInfo, buckets], bucketed_count_smaller_than_mids, +) + counts = dmapreduce([dInfo, buckets], bucketed_count_smaller_than_mids, +) lims = update_extrema(counts, targets, lims, mids) end diff --git a/test/base.jl b/test/base.jl index adf0479..f171d9a 100644 --- a/test/base.jl +++ b/test/base.jl @@ -85,10 +85,21 @@ @test dmapreduce(:noname, x -> x, (a, b) -> a + b, []) == nothing end + @testset "`pmap` on distributed data" begin + fetch.(save_at.(W, :test, 1234321)) + di = Dinfo(:test, W) # also test the example in docs + @test dpmap( + x -> :($(di.val) + $x), + WorkerPool(di.workers), + [4321234, 1234, 4321], + ) == [5555555, 1235555, 1238642] + fetch.(remove_from.(W, :test)) + end + @testset "Internal utilities" begin @test DistributedData.tmp_symbol(:test) != :test - @test DistributedData.tmp_symbol(:test, prefix = "abc", - suffix = "def") == :abctestdef + @test DistributedData.tmp_symbol(:test, prefix = "abc", suffix = "def") == + :abctestdef @test DistributedData.tmp_symbol(Dinfo(:test, W)) != :test end