-
Notifications
You must be signed in to change notification settings - Fork 371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support threading in joined column creation #2664
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,8 +96,21 @@ function compose_inner_table(joiner::DataFrameJoiner, | |
left_rename::Union{Function, AbstractString, Symbol}, | ||
right_rename::Union{Function, AbstractString, Symbol}) | ||
left_ixs, right_ixs = find_inner_rows(joiner) | ||
dfl = joiner.dfl[left_ixs, :] | ||
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)] | ||
|
||
@static if VERSION >= v"1.4" | ||
if Threads.nthreads() > 1 && length(left_ixs) >= 1_000_000 | ||
dfl_task = Threads.@spawn joiner.dfl[left_ixs, :] | ||
dfr_noon_task = Threads.@spawn joiner.dfr[right_ixs, Not(joiner.right_on)] | ||
dfl = fetch(dfl_task) | ||
dfr_noon = fetch(dfr_noon_task) | ||
else | ||
dfl = joiner.dfl[left_ixs, :] | ||
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)] | ||
end | ||
else | ||
dfl = joiner.dfl[left_ixs, :] | ||
dfr_noon = joiner.dfr[right_ixs, Not(joiner.right_on)] | ||
end | ||
|
||
ncleft = ncol(dfl) | ||
cols = Vector{AbstractVector}(undef, ncleft + ncol(dfr_noon)) | ||
|
@@ -207,21 +220,49 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique: | |
|
||
@assert col_idx == ncol(joiner.dfl_on) + 1 | ||
|
||
for col in eachcol(dfl_noon) | ||
cols_i = left_idxs[col_idx] | ||
cols[cols_i] = _similar_left(col, target_nrow) | ||
copyto!(cols[cols_i], view(col, left_ixs)) | ||
copyto!(cols[cols_i], lil + 1, view(col, leftonly_ixs), 1, loil) | ||
col_idx += 1 | ||
end | ||
|
||
@assert col_idx == ncol(joiner.dfl) + 1 | ||
|
||
for col in eachcol(dfr_noon) | ||
cols[col_idx] = _similar_right(col, target_nrow) | ||
copyto!(cols[col_idx], view(col, right_ixs)) | ||
copyto!(cols[col_idx], lil + loil + 1, view(col, rightonly_ixs), 1, roil) | ||
col_idx += 1 | ||
@static if VERSION >= v"1.4" | ||
if Threads.nthreads() > 1 && target_nrow >= 1_000_000 && length(cols) > col_idx | ||
@sync begin | ||
for col in eachcol(dfl_noon) | ||
cols_i = left_idxs[col_idx] | ||
Threads.@spawn _noon_compose_helper(cols, _similar_left, cols_i, | ||
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil) | ||
col_idx += 1 | ||
end | ||
@assert col_idx == ncol(joiner.dfl) + 1 | ||
for col in eachcol(dfr_noon) | ||
let cols_i = col_idx | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right - |
||
Threads.@spawn _noon_compose_helper(cols, _similar_right, cols_i, col, target_nrow, | ||
right_ixs, lil + loil + 1, rightonly_ixs, roil) | ||
end | ||
bkamins marked this conversation as resolved.
Show resolved
Hide resolved
|
||
col_idx += 1 | ||
end | ||
end | ||
else | ||
for col in eachcol(dfl_noon) | ||
_noon_compose_helper(cols, _similar_left, left_idxs[col_idx], | ||
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil) | ||
col_idx += 1 | ||
end | ||
@assert col_idx == ncol(joiner.dfl) + 1 | ||
for col in eachcol(dfr_noon) | ||
_noon_compose_helper(cols, _similar_right, col_idx, col, target_nrow, | ||
right_ixs, lil + loil + 1, rightonly_ixs, roil) | ||
col_idx += 1 | ||
end | ||
end | ||
else | ||
for col in eachcol(dfl_noon) | ||
_noon_compose_helper(cols, _similar_left, left_idxs[col_idx], | ||
col, target_nrow, left_ixs, lil + 1, leftonly_ixs, loil) | ||
col_idx += 1 | ||
end | ||
@assert col_idx == ncol(joiner.dfl) + 1 | ||
for col in eachcol(dfr_noon) | ||
_noon_compose_helper(cols, _similar_right, col_idx, col, target_nrow, | ||
right_ixs, lil + loil + 1, rightonly_ixs, roil) | ||
col_idx += 1 | ||
end | ||
end | ||
|
||
@assert col_idx == length(cols) + 1 | ||
|
@@ -233,6 +274,13 @@ function compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique: | |
return res, src_indicator | ||
end | ||
|
||
function _noon_compose_helper(cols, similar_col, cols_i, col, target_nrow, | ||
side_ixs, offset, sideonly_ixs, tocopy) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add types to the signature? Maybe also add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK - fixed |
||
cols[cols_i] = similar_col(col, target_nrow) | ||
copyto!(cols[cols_i], view(col, side_ixs)) | ||
copyto!(cols[cols_i], offset, view(col, sideonly_ixs), 1, tocopy) | ||
end | ||
|
||
function _join(df1::AbstractDataFrame, df2::AbstractDataFrame; | ||
on::Union{<:OnType, AbstractVector}, kind::Symbol, makeunique::Bool, | ||
indicator::Union{Nothing, Symbol, AbstractString}, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not add threading for on-columns creation as in practice I expect that on-columns are minority of created columns and I avoid adding too much boilerplate code this way.