Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support threading in joined column creation #2664

Merged
merged 5 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 65 additions & 17 deletions src/join/composer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,21 @@ function compose_inner_table(joiner::DataFrameJoiner,
left_rename::Union{Function, AbstractString, Symbol},
right_rename::Union{Function, AbstractString, Symbol})
left_ixs, right_ixs = find_inner_rows(joiner)
dfl = joiner.dfl[left_ixs, :]
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)]

@static if VERSION >= v"1.4"
if Threads.nthreads() > 1 && length(left_ixs) >= 1_000_000
dfl_task = Threads.@spawn joiner.dfl[left_ixs, :]
dfr_noon_task = Threads.@spawn joiner.dfr[right_ixs, Not(joiner.right_on)]
dfl = fetch(dfl_task)
dfr_noon = fetch(dfr_noon_task)
else
dfl = joiner.dfl[left_ixs, :]
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)]
end
else
dfl = joiner.dfl[left_ixs, :]
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)]
end

ncleft = ncol(dfl)
cols = Vector{AbstractVector}(undef, ncleft + ncol(dfr_noon))
Expand Down Expand Up @@ -207,21 +220,49 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique:

@assert col_idx == ncol(joiner.dfl_on) + 1

for col in eachcol(dfl_noon)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not add threading for on-columns creation as in practice I expect that on-columns are minority of created columns and I avoid adding too much boilerplate code this way.

cols_i = left_idxs[col_idx]
cols[cols_i] = _similar_left(col, target_nrow)
copyto!(cols[cols_i], view(col, left_ixs))
copyto!(cols[cols_i], lil + 1, view(col, leftonly_ixs), 1, loil)
col_idx += 1
end

@assert col_idx == ncol(joiner.dfl) + 1

for col in eachcol(dfr_noon)
cols[col_idx] = _similar_right(col, target_nrow)
copyto!(cols[col_idx], view(col, right_ixs))
copyto!(cols[col_idx], lil + loil + 1, view(col, rightonly_ixs), 1, roil)
col_idx += 1
@static if VERSION >= v"1.4"
if Threads.nthreads() > 1 && target_nrow >= 1_000_000 && length(cols) > col_idx
@sync begin
for col in eachcol(dfl_noon)
cols_i = left_idxs[col_idx]
Threads.@spawn _noon_compose_helper(cols, _similar_left, cols_i,
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
col_idx += 1
end
@assert col_idx == ncol(joiner.dfl) + 1
for col in eachcol(dfr_noon)
let cols_i = col_idx
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use let here but not above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right - let is not needed, as just writing cols_i = col_idx will ensure we get a new binding for cols_i. I will change this.

Threads.@spawn _noon_compose_helper(cols, _similar_right, cols_i, col, target_nrow,
right_ixs, lil + loil + 1, rightonly_ixs, roil)
end
col_idx += 1
end
end
else
for col in eachcol(dfl_noon)
_noon_compose_helper(cols, _similar_left, left_idxs[col_idx],
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
col_idx += 1
end
@assert col_idx == ncol(joiner.dfl) + 1
for col in eachcol(dfr_noon)
_noon_compose_helper(cols, _similar_right, col_idx, col, target_nrow,
right_ixs, lil + loil + 1, rightonly_ixs, roil)
col_idx += 1
end
end
else
for col in eachcol(dfl_noon)
_noon_compose_helper(cols, _similar_left, left_idxs[col_idx],
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil)
col_idx += 1
end
@assert col_idx == ncol(joiner.dfl) + 1
for col in eachcol(dfr_noon)
_noon_compose_helper(cols, _similar_right, col_idx, col, target_nrow,
right_ixs, lil + loil + 1, rightonly_ixs, roil)
col_idx += 1
end
end

@assert col_idx == length(cols) + 1
Expand All @@ -233,6 +274,13 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique:
return res, src_indicator
end

function _noon_compose_helper(cols, similar_col, cols_i, col, target_nrow,
side_ixs, offset, sideonly_ixs, tocopy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add types to the signature? Maybe also add a ! at the end of the function name.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - fixed

cols[cols_i] = similar_col(col, target_nrow)
copyto!(cols[cols_i], view(col, side_ixs))
copyto!(cols[cols_i], offset, view(col, sideonly_ixs), 1, tocopy)
end

function _join(df1::AbstractDataFrame, df2::AbstractDataFrame;
on::Union{<:OnType, AbstractVector}, kind::Symbol, makeunique::Bool,
indicator::Union{Nothing, Symbol, AbstractString},
Expand Down
61 changes: 61 additions & 0 deletions test/join.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1496,4 +1496,65 @@ end
@test m1[!, :a] == m2[!, :a]
end

@testset "threaded correctness" begin
df1 = DataFrame(id=[1:10^6; 10^7+1:10^7+2])
df1.left_row = axes(df1, 1)
df2 = DataFrame(id=[1:10^6; 10^8+1:10^8+4])
df2.right_row = axes(df2, 1)

df_inner = DataFrame(id=1:10^6, left_row=1:10^6, right_row=1:10^6)
df_left = DataFrame(id=[1:10^6; 10^7+1:10^7+2], left_row=1:10^6+2,
right_row=[1:10^6; missing; missing])
df_right = DataFrame(id=[1:10^6; 10^8+1:10^8+4],
left_row=[1:10^6; fill(missing, 4)],
right_row=1:10^6+4)
df_outer = DataFrame(id=[1:10^6; 10^7+1:10^7+2; 10^8+1:10^8+4],
left_row=[1:10^6+2; fill(missing, 4)],
right_row=[1:10^6; missing; missing; 10^6+1:10^6+4])
df_semi = DataFrame(id=1:10^6, left_row=1:10^6)
df_anti = DataFrame(id=10^7+1:10^7+2, left_row=10^6+1:10^6+2)

@test innerjoin(df1, df2, on=:id) ≅ df_inner
@test leftjoin(df1, df2, on=:id) ≅ df_left
@test rightjoin(df1, df2, on=:id) ≅ df_right
@test outerjoin(df1, df2, on=:id) ≅ df_outer
@test semijoin(df1, df2, on=:id) ≅ df_semi
@test antijoin(df1, df2, on=:id) ≅ df_anti

Random.seed!(1234)
for i in 1:4
df1 = df1[shuffle(axes(df1, 1)), :]
df2 = df2[shuffle(axes(df2, 1)), :]
@test sort!(innerjoin(df1, df2, on=:id)) ≅ df_inner
@test sort!(leftjoin(df1, df2, on=:id)) ≅ df_left
@test sort!(rightjoin(df1, df2, on=:id)) ≅ df_right
@test sort!(outerjoin(df1, df2, on=:id)) ≅ df_outer
@test sort!(semijoin(df1, df2, on=:id)) ≅ df_semi
@test sort!(antijoin(df1, df2, on=:id)) ≅ df_anti
end

# test correctness of column order
df1 = DataFrame(a="a", id2=-[1:10^6; 10^7+1:10^7+2], b="b",
id1=[1:10^6; 10^7+1:10^7+2], c="c", d="d")
df2 = DataFrame(e="e", id1=[1:10^6; 10^8+1:10^8+4], f="f", g="g",
id2=-[1:10^6; 10^8+1:10^8+4], h="h")
@test innerjoin(df1, df2, on=[:id1, :id2]) ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test leftjoin(df1, df2, on=[:id1, :id2])[1:10^6, :] ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test rightjoin(df1, df2, on=[:id1, :id2])[1:10^6, :] ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test outerjoin(df1, df2, on=[:id1, :id2])[1:10^6, :] ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6,
c="c", d="d", e="e", f="f", g="g", h="h")
@test semijoin(df1, df2, on=[:id1, :id2]) ≅
DataFrame(a="a", id2=-(1:10^6), b="b", id1=1:10^6, c="c", d="d")
@test antijoin(df1, df2, on=[:id1, :id2]) ≅
DataFrame(a="a", id2=-(10^7+1:10^7+2), b="b", id1=(10^7+1:10^7+2),
c="c", d="d")
end

end # module