Skip to content

Commit

Permalink
Support threading in joined column creation (#2664)
Browse files Browse the repository at this point in the history
  • Loading branch information
bkamins authored Mar 22, 2021
1 parent 872ec18 commit 7564352
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 22 deletions.
12 changes: 7 additions & 5 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@
values in `on` columns. These aspects of input data frames might affect the
order of rows produced in the output
([#2612](https://github.com/JuliaData/DataFrames.jl/pull/2612),
([#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622])
[#2622](https://github.com/JuliaData/DataFrames.jl/pull/2622))
* `DataFrame` constructor, `copy`, `getindex`, `select`, `select!`, `transform`,
`transform!`, and `combine` functions now use multiple threads in selected operations
([#2647](https://github.com/JuliaData/DataFrames.jl/pull/2647)),
([#2588](https://github.com/JuliaData/DataFrames.jl/pull/2588)),
([#2574](https://github.com/JuliaData/DataFrames.jl/pull/2574))
`transform!`, `combine`, `sort`, and join functions now use multiple threads
in selected operations
([#2647](https://github.com/JuliaData/DataFrames.jl/pull/2647),
[#2588](https://github.com/JuliaData/DataFrames.jl/pull/2588),
[#2574](https://github.com/JuliaData/DataFrames.jl/pull/2574),
[#2664](https://github.com/JuliaData/DataFrames.jl/pull/2664))

# DataFrames v0.22 Release Notes

Expand Down
89 changes: 72 additions & 17 deletions src/join/composer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,21 @@ function compose_inner_table(joiner::DataFrameJoiner,
left_rename::Union{Function, AbstractString, Symbol},
right_rename::Union{Function, AbstractString, Symbol})
left_ixs, right_ixs = find_inner_rows(joiner)
dfl = joiner.dfl[left_ixs, :]
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)]

@static if VERSION >= v"1.4"
if Threads.nthreads() > 1 && length(left_ixs) >= 1_000_000
dfl_task = Threads.@spawn joiner.dfl[left_ixs, :]
dfr_noon_task = Threads.@spawn joiner.dfr[right_ixs, Not(joiner.right_on)]
dfl = fetch(dfl_task)
dfr_noon = fetch(dfr_noon_task)
else
dfl = joiner.dfl[left_ixs, :]
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)]
end
else
dfl = joiner.dfl[left_ixs, :]
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)]
end

ncleft = ncol(dfl)
cols = Vector{AbstractVector}(undef, ncleft + ncol(dfr_noon))
Expand Down Expand Up @@ -207,21 +220,48 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique:

@assert col_idx == ncol(joiner.dfl_on) + 1

for col in eachcol(dfl_noon)
cols_i = left_idxs[col_idx]
cols[cols_i] = _similar_left(col, target_nrow)
copyto!(cols[cols_i], view(col, left_ixs))
copyto!(cols[cols_i], lil + 1, view(col, leftonly_ixs), 1, loil)
col_idx += 1
end

@assert col_idx == ncol(joiner.dfl) + 1

for col in eachcol(dfr_noon)
cols[col_idx] = _similar_right(col, target_nrow)
copyto!(cols[col_idx], view(col, right_ixs))
copyto!(cols[col_idx], lil + loil + 1, view(col, rightonly_ixs), 1, roil)
col_idx += 1
@static if VERSION >= v"1.4"
if Threads.nthreads() > 1 && target_nrow >= 1_000_000 && length(cols) > col_idx
@sync begin
for col in eachcol(dfl_noon)
cols_i = left_idxs[col_idx]
Threads.@spawn _noon_compose_helper!(cols, _similar_left, cols_i,
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
col_idx += 1
end
@assert col_idx == ncol(joiner.dfl) + 1
for col in eachcol(dfr_noon)
cols_i = col_idx
Threads.@spawn _noon_compose_helper!(cols, _similar_right, cols_i, col, target_nrow,
right_ixs, lil + loil + 1, rightonly_ixs, roil)
col_idx += 1
end
end
else
for col in eachcol(dfl_noon)
_noon_compose_helper!(cols, _similar_left, left_idxs[col_idx],
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
col_idx += 1
end
@assert col_idx == ncol(joiner.dfl) + 1
for col in eachcol(dfr_noon)
_noon_compose_helper!(cols, _similar_right, col_idx, col, target_nrow,
right_ixs, lil + loil + 1, rightonly_ixs, roil)
col_idx += 1
end
end
else
for col in eachcol(dfl_noon)
_noon_compose_helper!(cols, _similar_left, left_idxs[col_idx],
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
col_idx += 1
end
@assert col_idx == ncol(joiner.dfl) + 1
for col in eachcol(dfr_noon)
_noon_compose_helper!(cols, _similar_right, col_idx, col, target_nrow,
right_ixs, lil + loil + 1, rightonly_ixs, roil)
col_idx += 1
end
end

@assert col_idx == length(cols) + 1
Expand All @@ -233,6 +273,21 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique:
return res, src_indicator
end

function _noon_compose_helper!(cols::Vector{AbstractVector}, # target container to populate
similar_col::Function, # function to use to materialize new column
cols_i::Integer, # index in cols to populate
col::AbstractVector, # source column
target_nrow::Integer, # target number of rows in new column
side_ixs::AbstractVector, # indices in col that were matched
offset::Integer, # offset to put non matched indices
sideonly_ixs::AbstractVector, # indices in col that were not
tocopy::Integer) # number on non-matched rows to copy
@assert tocopy == length(sideonly_ixs)
cols[cols_i] = similar_col(col, target_nrow)
copyto!(cols[cols_i], view(col, side_ixs))
copyto!(cols[cols_i], offset, view(col, sideonly_ixs), 1, tocopy)
end

function _join(df1::AbstractDataFrame, df2::AbstractDataFrame;
on::Union{<:OnType, AbstractVector}, kind::Symbol, makeunique::Bool,
indicator::Union{Nothing, Symbol, AbstractString},
Expand Down
61 changes: 61 additions & 0 deletions test/join.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1496,4 +1496,65 @@ end
@test m1[!, :a] == m2[!, :a]
end

@testset "threaded correctness" begin
df1 = DataFrame(id=[1:10^6; 10^7+1:10^7+2])
df1.left_row = axes(df1, 1)
df2 = DataFrame(id=[1:10^6; 10^8+1:10^8+4])
df2.right_row = axes(df2, 1)

df_inner = DataFrame(id=1:10^6, left_row=1:10^6, right_row=1:10^6)
df_left = DataFrame(id=[1:10^6; 10^7+1:10^7+2], left_row=1:10^6+2,
right_row=[1:10^6; missing; missing])
df_right = DataFrame(id=[1:10^6; 10^8+1:10^8+4],
left_row=[1:10^6; fill(missing, 4)],
right_row=1:10^6+4)
df_outer = DataFrame(id=[1:10^6; 10^7+1:10^7+2; 10^8+1:10^8+4],
left_row=[1:10^6+2; fill(missing, 4)],
right_row=[1:10^6; missing; missing; 10^6+1:10^6+4])
df_semi = DataFrame(id=1:10^6, left_row=1:10^6)
df_anti = DataFrame(id=10^7+1:10^7+2, left_row=10^6+1:10^6+2)

@test innerjoin(df1, df2, on=:id) df_inner
@test leftjoin(df1, df2, on=:id) df_left
@test rightjoin(df1, df2, on=:id) df_right
@test outerjoin(df1, df2, on=:id) df_outer
@test semijoin(df1, df2, on=:id) df_semi
@test antijoin(df1, df2, on=:id) df_anti

Random.seed!(1234)
for i in 1:4
df1 = df1[shuffle(axes(df1, 1)), :]
df2 = df2[shuffle(axes(df2, 1)), :]
@test sort!(innerjoin(df1, df2, on=:id)) df_inner
@test sort!(leftjoin(df1, df2, on=:id)) df_left
@test sort!(rightjoin(df1, df2, on=:id)) df_right
@test sort!(outerjoin(df1, df2, on=:id)) df_outer
@test sort!(semijoin(df1, df2, on=:id)) df_semi
@test sort!(antijoin(df1, df2, on=:id)) df_anti
end

# test correctness of column order
df1 = DataFrame(a="a", id2=-[1:10^6; 10^7+1:10^7+2], b="b",
id1=[1:10^6; 10^7+1:10^7+2], c="c", d="d")
df2 = DataFrame(e="e", id1=[1:10^6; 10^8+1:10^8+4], f="f", g="g",
id2=-[1:10^6; 10^8+1:10^8+4], h="h")
@test innerjoin(df1, df2, on=[:id1, :id2])
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test leftjoin(df1, df2, on=[:id1, :id2])[1:10^6, :]
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test rightjoin(df1, df2, on=[:id1, :id2])[1:10^6, :]
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test outerjoin(df1, df2, on=[:id1, :id2])[1:10^6, :]
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test semijoin(df1, df2, on=[:id1, :id2])
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6, c="c", d="d")
@test antijoin(df1, df2, on=[:id1, :id2])
DataFrame(a="a", id2=-(10^7+1:10^7+2), b="b", id1=(10^7+1:10^7+2),
c="c", d="d")
end

end # module

0 comments on commit 7564352

Please sign in to comment.