Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support threading in joined column creation #2664

Merged
merged 5 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 72 additions & 17 deletions src/join/composer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,21 @@ function compose_inner_table(joiner::DataFrameJoiner,
left_rename::Union{Function, AbstractString, Symbol},
right_rename::Union{Function, AbstractString, Symbol})
left_ixs, right_ixs = find_inner_rows(joiner)
dfl = joiner.dfl[left_ixs, :]
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)]

@static if VERSION >= v"1.4"
if Threads.nthreads() > 1 && length(left_ixs) >= 1_000_000
dfl_task = Threads.@spawn joiner.dfl[left_ixs, :]
dfr_noon_task = Threads.@spawn joiner.dfr[right_ixs, Not(joiner.right_on)]
dfl = fetch(dfl_task)
dfr_noon = fetch(dfr_noon_task)
else
dfl = joiner.dfl[left_ixs, :]
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)]
end
else
dfl = joiner.dfl[left_ixs, :]
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)]
end

ncleft = ncol(dfl)
cols = Vector{AbstractVector}(undef, ncleft + ncol(dfr_noon))
Expand Down Expand Up @@ -207,21 +220,48 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique:

@assert col_idx == ncol(joiner.dfl_on) + 1

for col in eachcol(dfl_noon)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not add threading for on-columns creation as in practice I expect that on-columns are minority of created columns and I avoid adding too much boilerplate code this way.

cols_i = left_idxs[col_idx]
cols[cols_i] = _similar_left(col, target_nrow)
copyto!(cols[cols_i], view(col, left_ixs))
copyto!(cols[cols_i], lil + 1, view(col, leftonly_ixs), 1, loil)
col_idx += 1
end

@assert col_idx == ncol(joiner.dfl) + 1

for col in eachcol(dfr_noon)
cols[col_idx] = _similar_right(col, target_nrow)
copyto!(cols[col_idx], view(col, right_ixs))
copyto!(cols[col_idx], lil + loil + 1, view(col, rightonly_ixs), 1, roil)
col_idx += 1
@static if VERSION >= v"1.4"
if Threads.nthreads() > 1 && target_nrow >= 1_000_000 && length(cols) > col_idx
@sync begin
for col in eachcol(dfl_noon)
cols_i = left_idxs[col_idx]
Threads.@spawn _noon_compose_helper!(cols, _similar_left, cols_i,
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
col_idx += 1
end
@assert col_idx == ncol(joiner.dfl) + 1
for col in eachcol(dfr_noon)
cols_i = col_idx
Threads.@spawn _noon_compose_helper!(cols, _similar_right, cols_i, col, target_nrow,
right_ixs, lil + loil + 1, rightonly_ixs, roil)
col_idx += 1
end
end
else
for col in eachcol(dfl_noon)
_noon_compose_helper!(cols, _similar_left, left_idxs[col_idx],
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
col_idx += 1
end
@assert col_idx == ncol(joiner.dfl) + 1
for col in eachcol(dfr_noon)
_noon_compose_helper!(cols, _similar_right, col_idx, col, target_nrow,
right_ixs, lil + loil + 1, rightonly_ixs, roil)
col_idx += 1
end
end
else
for col in eachcol(dfl_noon)
_noon_compose_helper!(cols, _similar_left, left_idxs[col_idx],
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
col_idx += 1
end
@assert col_idx == ncol(joiner.dfl) + 1
for col in eachcol(dfr_noon)
_noon_compose_helper!(cols, _similar_right, col_idx, col, target_nrow,
right_ixs, lil + loil + 1, rightonly_ixs, roil)
col_idx += 1
end
end

@assert col_idx == length(cols) + 1
Expand All @@ -233,6 +273,21 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique:
return res, src_indicator
end

function _noon_compose_helper!(cols::Vector{AbstractVector}, # target container to populate
similar_col::Function, # function to use to materialize new column
cols_i::Integer, # index in cols to populate
col::AbstractVector, # source column
target_nrow::Integer, # target number of rows in new column
side_ixs::AbstractVector, # indices in col that were matched
offset::Integer, # offset to put non matched indices
sideonly_ixs::AbstractVector, # indices in col that were not
tocopy::Integer) # number on non-matched rows to copy
@assert tocopy == length(sideonly_ixs)
cols[cols_i] = similar_col(col, target_nrow)
copyto!(cols[cols_i], view(col, side_ixs))
copyto!(cols[cols_i], offset, view(col, sideonly_ixs), 1, tocopy)
end

function _join(df1::AbstractDataFrame, df2::AbstractDataFrame;
on::Union{<:OnType, AbstractVector}, kind::Symbol, makeunique::Bool,
indicator::Union{Nothing, Symbol, AbstractString},
Expand Down
61 changes: 61 additions & 0 deletions test/join.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1496,4 +1496,65 @@ end
@test m1[!, :a] == m2[!, :a]
end

@testset "threaded correctness" begin
df1 = DataFrame(id=[1:10^6; 10^7+1:10^7+2])
df1.left_row = axes(df1, 1)
df2 = DataFrame(id=[1:10^6; 10^8+1:10^8+4])
df2.right_row = axes(df2, 1)

df_inner = DataFrame(id=1:10^6, left_row=1:10^6, right_row=1:10^6)
df_left = DataFrame(id=[1:10^6; 10^7+1:10^7+2], left_row=1:10^6+2,
right_row=[1:10^6; missing; missing])
df_right = DataFrame(id=[1:10^6; 10^8+1:10^8+4],
left_row=[1:10^6; fill(missing, 4)],
right_row=1:10^6+4)
df_outer = DataFrame(id=[1:10^6; 10^7+1:10^7+2; 10^8+1:10^8+4],
left_row=[1:10^6+2; fill(missing, 4)],
right_row=[1:10^6; missing; missing; 10^6+1:10^6+4])
df_semi = DataFrame(id=1:10^6, left_row=1:10^6)
df_anti = DataFrame(id=10^7+1:10^7+2, left_row=10^6+1:10^6+2)

@test innerjoin(df1, df2, on=:id) ≅ df_inner
@test leftjoin(df1, df2, on=:id) ≅ df_left
@test rightjoin(df1, df2, on=:id) ≅ df_right
@test outerjoin(df1, df2, on=:id) ≅ df_outer
@test semijoin(df1, df2, on=:id) ≅ df_semi
@test antijoin(df1, df2, on=:id) ≅ df_anti

Random.seed!(1234)
for i in 1:4
df1 = df1[shuffle(axes(df1, 1)), :]
df2 = df2[shuffle(axes(df2, 1)), :]
@test sort!(innerjoin(df1, df2, on=:id)) ≅ df_inner
@test sort!(leftjoin(df1, df2, on=:id)) ≅ df_left
@test sort!(rightjoin(df1, df2, on=:id)) ≅ df_right
@test sort!(outerjoin(df1, df2, on=:id)) ≅ df_outer
@test sort!(semijoin(df1, df2, on=:id)) ≅ df_semi
@test sort!(antijoin(df1, df2, on=:id)) ≅ df_anti
end

# test correctness of column order
df1 = DataFrame(a="a", id2=-[1:10^6; 10^7+1:10^7+2], b="b",
id1=[1:10^6; 10^7+1:10^7+2], c="c", d="d")
df2 = DataFrame(e="e", id1=[1:10^6; 10^8+1:10^8+4], f="f", g="g",
id2=-[1:10^6; 10^8+1:10^8+4], h="h")
@test innerjoin(df1, df2, on=[:id1, :id2]) ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test leftjoin(df1, df2, on=[:id1, :id2])[1:10^6, :] ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test rightjoin(df1, df2, on=[:id1, :id2])[1:10^6, :] ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test outerjoin(df1, df2, on=[:id1, :id2])[1:10^6, :] ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test semijoin(df1, df2, on=[:id1, :id2]) ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6, c="c", d="d")
@test antijoin(df1, df2, on=[:id1, :id2]) ≅
DataFrame(a="a", id2=-(10^7+1:10^7+2), b="b", id1=(10^7+1:10^7+2),
c="c", d="d")
end

end # module