diff --git a/NEWS.md b/NEWS.md index 5193f2c08d..ecc6ef3e2f 100644 --- a/NEWS.md +++ b/NEWS.md @@ -46,12 +46,14 @@ values in `on` columns. These aspects of input data frames might affect the order of rows produced in the output ([#2612](https://github.com/JuliaData/DataFrames.jl/pull/2612), - ([#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622]) + [#2622](https://github.com/JuliaData/DataFrames.jl/pull/2622)) * `DataFrame` constructor, `copy`, `getindex`, `select`, `select!`, `transform`, - `transform!`, and `combine` functions now use multiple threads in selected operations - ([#2647](https://github.com/JuliaData/DataFrames.jl/pull/2647)), - ([#2588](https://github.com/JuliaData/DataFrames.jl/pull/2588)), - ([#2574](https://github.com/JuliaData/DataFrames.jl/pull/2574)) + `transform!`, `combine`, `sort`, and join functions now use multiple threads + in selected operations + ([#2647](https://github.com/JuliaData/DataFrames.jl/pull/2647), + [#2588](https://github.com/JuliaData/DataFrames.jl/pull/2588), + [#2574](https://github.com/JuliaData/DataFrames.jl/pull/2574), + [#2664](https://github.com/JuliaData/DataFrames.jl/pull/2664)) # DataFrames v0.22 Release Notes diff --git a/src/join/composer.jl b/src/join/composer.jl index 4093b0d97e..d434cf93a1 100644 --- a/src/join/composer.jl +++ b/src/join/composer.jl @@ -96,8 +96,21 @@ function compose_inner_table(joiner::DataFrameJoiner, left_rename::Union{Function, AbstractString, Symbol}, right_rename::Union{Function, AbstractString, Symbol}) left_ixs, right_ixs = find_inner_rows(joiner) - dfl = joiner.dfl[left_ixs, :] - dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)] + + @static if VERSION >= v"1.4" + if Threads.nthreads() > 1 && length(left_ixs) >= 1_000_000 + dfl_task = Threads.@spawn joiner.dfl[left_ixs, :] + dfr_noon_task = Threads.@spawn joiner.dfr[right_ixs, Not(joiner.right_on)] + dfl = fetch(dfl_task) + dfr_noon = fetch(dfr_noon_task) + else + dfl = joiner.dfl[left_ixs, :] + dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)] + end + else + dfl = joiner.dfl[left_ixs, :] + dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)] + end ncleft = ncol(dfl) cols = Vector{AbstractVector}(undef, ncleft + ncol(dfr_noon)) @@ -207,21 +220,48 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique: @assert col_idx == ncol(joiner.dfl_on) + 1 - for col in eachcol(dfl_noon) - cols_i = left_idxs[col_idx] - cols[cols_i] = _similar_left(col, target_nrow) - copyto!(cols[cols_i], view(col, left_ixs)) - copyto!(cols[cols_i], lil + 1, view(col, leftonly_ixs), 1, loil) - col_idx += 1 - end - - @assert col_idx == ncol(joiner.dfl) + 1 - - for col in eachcol(dfr_noon) - cols[col_idx] = _similar_right(col, target_nrow) - copyto!(cols[col_idx], view(col, right_ixs)) - copyto!(cols[col_idx], lil + loil + 1, view(col, rightonly_ixs), 1, roil) - col_idx += 1 + @static if VERSION >= v"1.4" + if Threads.nthreads() > 1 && target_nrow >= 1_000_000 && length(cols) > col_idx + @sync begin + for col in eachcol(dfl_noon) + cols_i = left_idxs[col_idx] + Threads.@spawn _noon_compose_helper!(cols, _similar_left, cols_i, + col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil) + col_idx += 1 + end + @assert col_idx == ncol(joiner.dfl) + 1 + for col in eachcol(dfr_noon) + cols_i = col_idx + Threads.@spawn _noon_compose_helper!(cols, _similar_right, cols_i, col, target_nrow, + right_ixs, lil + loil + 1, rightonly_ixs, roil) + col_idx += 1 + end + end + else + for col in eachcol(dfl_noon) + _noon_compose_helper!(cols, _similar_left, left_idxs[col_idx], + col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil) + col_idx += 1 + end + @assert col_idx == ncol(joiner.dfl) + 1 + for col in eachcol(dfr_noon) + _noon_compose_helper!(cols, _similar_right, col_idx, col, target_nrow, + right_ixs, lil + loil + 1, rightonly_ixs, roil) + col_idx += 1 + end + end + else + for col in eachcol(dfl_noon) + _noon_compose_helper!(cols, _similar_left, left_idxs[col_idx], + col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil) + col_idx += 1 + end + @assert col_idx == ncol(joiner.dfl) + 1 + for col in eachcol(dfr_noon) + _noon_compose_helper!(cols, _similar_right, col_idx, col, target_nrow, + right_ixs, lil + loil + 1, rightonly_ixs, roil) + col_idx += 1 + end end @assert col_idx == length(cols) + 1 @@ -233,6 +273,21 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique: return res, src_indicator end +function _noon_compose_helper!(cols::Vector{AbstractVector}, # target container to populate + similar_col::Function, # function to use to materialize new column + cols_i::Integer, # index in cols to populate + col::AbstractVector, # source column + target_nrow::Integer, # target number of rows in new column + side_ixs::AbstractVector, # indices in col that were matched + offset::Integer, # offset to put non matched indices + sideonly_ixs::AbstractVector, # indices in col that were not + tocopy::Integer) # number on non-matched rows to copy + @assert tocopy == length(sideonly_ixs) + cols[cols_i] = similar_col(col, target_nrow) + copyto!(cols[cols_i], view(col, side_ixs)) + copyto!(cols[cols_i], offset, view(col, sideonly_ixs), 1, tocopy) +end + function _join(df1::AbstractDataFrame, df2::AbstractDataFrame; on::Union{<:OnType, AbstractVector}, kind::Symbol, makeunique::Bool, indicator::Union{Nothing, Symbol, AbstractString}, diff --git a/test/join.jl b/test/join.jl index 6ec9baa640..68afbbc4e8 100644 --- a/test/join.jl +++ b/test/join.jl @@ -1496,4 +1496,65 @@ end @test m1[!, :a] == m2[!, :a] end +@testset "threaded correctness" begin + df1 = DataFrame(id=[1:10^6; 10^7+1:10^7+2]) + df1.left_row = axes(df1, 1) + df2 = DataFrame(id=[1:10^6; 10^8+1:10^8+4]) + df2.right_row = axes(df2, 1) + + df_inner = DataFrame(id=1:10^6, left_row=1:10^6, right_row=1:10^6) + df_left = DataFrame(id=[1:10^6; 10^7+1:10^7+2], left_row=1:10^6+2, + right_row=[1:10^6; missing; missing]) + df_right = DataFrame(id=[1:10^6; 10^8+1:10^8+4], + left_row=[1:10^6; fill(missing, 4)], + right_row=1:10^6+4) + df_outer = DataFrame(id=[1:10^6; 10^7+1:10^7+2; 10^8+1:10^8+4], + left_row=[1:10^6+2; fill(missing, 4)], + right_row=[1:10^6; missing; missing; 10^6+1:10^6+4]) + df_semi = DataFrame(id=1:10^6, left_row=1:10^6) + df_anti = DataFrame(id=10^7+1:10^7+2, left_row=10^6+1:10^6+2) + + @test innerjoin(df1, df2, on=:id) ≅ df_inner + @test leftjoin(df1, df2, on=:id) ≅ df_left + @test rightjoin(df1, df2, on=:id) ≅ df_right + @test outerjoin(df1, df2, on=:id) ≅ df_outer + @test semijoin(df1, df2, on=:id) ≅ df_semi + @test antijoin(df1, df2, on=:id) ≅ df_anti + + Random.seed!(1234) + for i in 1:4 + df1 = df1[shuffle(axes(df1, 1)), :] + df2 = df2[shuffle(axes(df2, 1)), :] + @test sort!(innerjoin(df1, df2, on=:id)) ≅ df_inner + @test sort!(leftjoin(df1, df2, on=:id)) ≅ df_left + @test sort!(rightjoin(df1, df2, on=:id)) ≅ df_right + @test sort!(outerjoin(df1, df2, on=:id)) ≅ df_outer + @test sort!(semijoin(df1, df2, on=:id)) ≅ df_semi + @test sort!(antijoin(df1, df2, on=:id)) ≅ df_anti + end + + # test correctness of column order + df1 = DataFrame(a="a", id2=-[1:10^6; 10^7+1:10^7+2], b="b", + id1=[1:10^6; 10^7+1:10^7+2], c="c", d="d") + df2 = DataFrame(e="e", id1=[1:10^6; 10^8+1:10^8+4], f="f", g="g", + id2=-[1:10^6; 10^8+1:10^8+4], h="h") + @test innerjoin(df1, df2, on=[:id1, :id2]) ≅ + DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6, + c="c", d="d", e="e", f="f", g="g", h="h") + @test leftjoin(df1, df2, on=[:id1, :id2])[1:10^6, :] ≅ + DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6, + c="c", d="d", e="e", f="f", g="g", h="h") + @test rightjoin(df1, df2, on=[:id1, :id2])[1:10^6, :] ≅ + DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6, + c="c", d="d", e="e", f="f", g="g", h="h") + @test outerjoin(df1, df2, on=[:id1, :id2])[1:10^6, :] ≅ + DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6, + c="c", d="d", e="e", f="f", g="g", h="h") + @test semijoin(df1, df2, on=[:id1, :id2]) ≅ + DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6, c="c", d="d") + @test antijoin(df1, df2, on=[:id1, :id2]) ≅ + DataFrame(a="a", id2=-(10^7+1:10^7+2), b="b", id1=(10^7+1:10^7+2), + c="c", d="d") +end + end # module