Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle Service #5976

Closed
wants to merge 101 commits into from
Closed
Show file tree
Hide file tree
Changes from 82 commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
9deb8c6
Move pandas groupby outside of event loop
mrocklin Mar 13, 2022
3f62911
Add MultiFile prototype
mrocklin Mar 13, 2022
626eda0
Integrate MultiFile with shuffle extension
mrocklin Mar 13, 2022
7f48f77
Add buffered comms
mrocklin Mar 14, 2022
99bb283
Move multi files to shuffle/
mrocklin Mar 14, 2022
0d49ab9
add arrow
mrocklin Mar 15, 2022
1e1311d
Handle buffers manually in multi_file
mrocklin Mar 15, 2022
b99329a
Pass around only bytes
mrocklin Mar 15, 2022
5781b7e
Clean up a few extra copies
mrocklin Mar 16, 2022
0ce6e01
Let comms continue without blocking on disk
mrocklin Mar 16, 2022
3204997
Move flush into multi_file.read
mrocklin Mar 16, 2022
8b11d6d
Avoid multiple accesses to the same file
mrocklin Mar 16, 2022
62cc43d
Change configuration for smoother single-machine use
mrocklin Mar 16, 2022
5a64248
Fix up some concurrency issues
mrocklin Mar 16, 2022
b901613
Fix shard size accountiing
mrocklin Mar 16, 2022
24092bb
add more connections if more workers
mrocklin Mar 16, 2022
7fbe4aa
Allow worker extensions to piggy-back on heartbeat
mrocklin Mar 17, 2022
89f6347
Merge branch 'heartbeat-extensions' into p2p-shuffle
mrocklin Mar 17, 2022
bc81db8
Remove file cache
mrocklin Mar 17, 2022
27d2ab3
First pass on adding a Scheduler extension and worker heartbeat
mrocklin Mar 17, 2022
9fd6da0
Name scheduler extensions
mrocklin Mar 17, 2022
8641abb
Merge branch 'heartbeat-extensions' into p2p-shuffle
mrocklin Mar 17, 2022
34617db
fixup test
mrocklin Mar 17, 2022
e1c0a4d
Add timing and diagnostics
mrocklin Mar 17, 2022
8c28b83
fixup tests
mrocklin Mar 17, 2022
29253b4
Use names for client extensions
mrocklin Mar 18, 2022
1f79575
Add back in manual addition of stealing extension
mrocklin Mar 18, 2022
cf9a939
Merge branch 'heartbeat-extensions' into p2p-shuffle
mrocklin Mar 18, 2022
6f2286e
Add basic shuffling dashboard
mrocklin Mar 18, 2022
efedc04
Merge branch 'main' of github.com:dask/distributed into heartbeat-ext…
mrocklin Mar 18, 2022
9b7b03b
Merge branch 'heartbeat-extensions' into p2p-shuffle
mrocklin Mar 18, 2022
456de23
Add colors to shuffling plots
mrocklin Mar 18, 2022
aef2f61
make larger dashboard page
mrocklin Mar 18, 2022
f79e923
extend shuffling dashboard
mrocklin Mar 18, 2022
1e0256f
Don't offload file writes
mrocklin Mar 18, 2022
97fb09c
reduce comm memory limit
mrocklin Mar 18, 2022
76baf4b
Merge branch 'main' into heartbeat-extensions
mrocklin Mar 18, 2022
f58b2e9
use multi-threaded thread-pool and swap np.unique for pd.Series.unique
mrocklin Mar 19, 2022
9bc6ce6
removeme: check state of extensions in test
mrocklin Mar 19, 2022
57b4a42
Merge branch 'heartbeat-extensions' of github.com:mrocklin/distribute…
mrocklin Mar 19, 2022
1309c22
I think that there is some strange SchedulerState interation going on
mrocklin Mar 19, 2022
c894f40
Track Event Loop intervals in dashboard plot
mrocklin Mar 19, 2022
486320d
Grey out unseen workers
mrocklin Mar 19, 2022
6419328
flake8
mrocklin Mar 19, 2022
9e6aadc
remove old test
mrocklin Mar 19, 2022
c202385
Merge branch 'main' of github.com:dask/distributed into p2p-shuffle
mrocklin Mar 19, 2022
cf51784
Merge branch 'event-loop-dashboard' into p2p-shuffle
mrocklin Mar 19, 2022
c272458
bump y-axis, add kwargs
mrocklin Mar 19, 2022
1d114c2
Merge branch 'event-loop-dashboard' into p2p-shuffle
mrocklin Mar 19, 2022
4ba9923
Add event loop figure to shuffling page
mrocklin Mar 19, 2022
e7a5143
Remove errant print
mrocklin Mar 21, 2022
b0cd7ae
Add test for the compute chain
mrocklin Mar 21, 2022
bd37f49
Simplify MultiComm and add docstrings
mrocklin Mar 22, 2022
6e1af62
Add close method to extensions
mrocklin Mar 22, 2022
7776ecb
Merge branch 'heartbeat-extensions' into p2p-shuffle
mrocklin Mar 22, 2022
107b5a0
Add close method to ShuffleWorkerExtension
mrocklin Mar 22, 2022
dc8a7a4
clean up old methods
mrocklin Mar 22, 2022
6ef62a0
Move multi-shuffle state to class level
mrocklin Mar 22, 2022
8550a13
Speed up tests
mrocklin Mar 22, 2022
2e01aa8
move multicomm queue to class level
mrocklin Mar 22, 2022
4486584
add docstrings and cleanup communicate future
mrocklin Mar 22, 2022
01403b9
Update distributed/stealing.py
mrocklin Mar 22, 2022
97fdf2a
use nonlocal
mrocklin Mar 22, 2022
0bd1f89
Merge branch 'heartbeat-extensions' of github.com:mrocklin/distribute…
mrocklin Mar 22, 2022
182dc83
Merge branch 'main' of github.com:dask/distributed into heartbeat-ext…
mrocklin Mar 23, 2022
007ea90
Update distributed/shuffle/multi_file.py
mrocklin Mar 23, 2022
d27d0f3
Merge branch 'main' of github.com:dask/distributed into heartbeat-ext…
mrocklin Mar 25, 2022
b50c61a
Merge branch 'heartbeat-extensions' into p2p-shuffle
mrocklin Mar 25, 2022
dfc31fd
Merge branch 'p2p-shuffle' of github.com:mrocklin/distributed into p2…
mrocklin Mar 25, 2022
ea000a3
cleanup hover
mrocklin Mar 25, 2022
4598577
Use weakkeydicitonary to handle multiple queues
mrocklin Mar 25, 2022
2993643
Add total_size to class level
mrocklin Mar 25, 2022
fe61116
make dashboard robust to missing workers
mrocklin Mar 25, 2022
adccb02
tests pass
mrocklin Mar 25, 2022
3aabef0
depend on pyarrow in CI
mrocklin Mar 25, 2022
2af4974
install dask@p2p-shuffle
mrocklin Mar 25, 2022
1756fb2
simplify dashboard charts
mrocklin Mar 25, 2022
6694c84
Move arrow utilities over to a separate file
mrocklin Mar 28, 2022
154b21f
Merge branch 'main' of github.com:dask/distributed into p2p-shuffle
mrocklin Mar 28, 2022
36956ef
Merge branch 'main' of github.com:dask/distributed into p2p-shuffle
mrocklin Mar 29, 2022
2ab401a
make multi_file tests pass
mrocklin Mar 29, 2022
90673d1
Add test for MultiComm
mrocklin Mar 29, 2022
7d8954a
Respond to feedback
mrocklin Mar 31, 2022
1796804
Drop runtime dependency to setuptools (#6017)
crusaderky Mar 29, 2022
8de2793
More idiomatic mypy configuration (#6022)
crusaderky Mar 30, 2022
caa852f
Python 3.10 (#5952)
graingert Mar 30, 2022
f3fb682
Cluster Dump SchedulerPlugin (#5983)
sjperkins Mar 30, 2022
0a1761d
Add tiny test for ToPickle (#6021)
mrocklin Mar 30, 2022
7cdb56f
Update gpuCI `RAPIDS_VER` to `22.06` (#5962)
github-actions[bot] Mar 30, 2022
d0afbb1
Retry on transient error codes in preload (#5982)
mrocklin Mar 30, 2022
a74fd38
Remove support for PyPy (#6029)
jrbourbeau Mar 31, 2022
bde718f
Make test_reconnect async (#6000)
mrocklin Mar 31, 2022
dd857b8
Short variant of test_report.html (#6034)
crusaderky Mar 31, 2022
9efb27c
Add test for bad disk
mrocklin Mar 31, 2022
69bed31
Support exceptions in MultiComm
mrocklin Mar 31, 2022
ec71091
add unit tests for exceptions
mrocklin Mar 31, 2022
016ed25
cleanup files properly
mrocklin Mar 31, 2022
8ee5605
cleanup extra futures
mrocklin Mar 31, 2022
1c3bfb9
Merge branch 'main' of github.com:dask/distributed into p2p-shuffle
mrocklin Mar 31, 2022
fa235ee
Support windows in tests (hopefully)
mrocklin Mar 31, 2022
df4348f
Merge branch 'main' of github.com:dask/distributed into p2p-shuffle
mrocklin Apr 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ jobs:
if: ${{ matrix.os == 'windows-latest' && matrix.python-version == '3.9' }}
run: mamba uninstall ipython

- name: Install dask branch
shell: bash -l {0}
run: python -m pip install --no-deps git+https://github.com/mrocklin/dask@p2p-shuffle

Comment on lines +78 to +81
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this already in?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet: dask/dask#8836

- name: Install
shell: bash -l {0}
run: |
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ repos:
- dask
- tornado
- zict
- pyarrow
1 change: 1 addition & 0 deletions continuous_integration/environment-3.8.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies:
- pre-commit
- prometheus_client
- psutil
- pyarrow=7
- pytest
- pytest-asyncio<0.14.0
- pytest-cov
Expand Down
1 change: 1 addition & 0 deletions continuous_integration/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies:
- pre-commit
- prometheus_client
- psutil
- pyarrow=7
- pynvml # Only tested here
- pytest
- pytest-asyncio<0.14.0
Expand Down
296 changes: 294 additions & 2 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,7 +1043,7 @@ class SystemTimeseries(DashboardComponent):
from ws.metrics["val"] for ws in scheduler.workers.values() divided by nuber of workers.
"""

def __init__(self, scheduler, **kwargs):
def __init__(self, scheduler, follow_interval=20000, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
Expand All @@ -1060,7 +1060,9 @@ def __init__(self, scheduler, **kwargs):

update(self.source, self.get_data())

x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0)
x_range = DataRange1d(
follow="end", follow_interval=follow_interval, range_padding=0
)
tools = "reset, xpan, xwheel_zoom"

self.bandwidth = figure(
Expand Down Expand Up @@ -3487,6 +3489,261 @@ def update(self):
self.source.data.update(data)


class Shuffling(DashboardComponent):
"""Occupancy (in time) per worker"""

def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"worker": [],
"y": [],
"comm_memory": [],
"comm_memory_limit": [],
"comm_buckets": [],
"comm_active": [],
"comm_avg_duration": [],
"comm_avg_size": [],
"comm_read": [],
"comm_written": [],
"comm_color": [],
"disk_memory": [],
"disk_memory_limit": [],
"disk_buckets": [],
"disk_active": [],
"disk_avg_duration": [],
"disk_avg_size": [],
"disk_read": [],
"disk_written": [],
"disk_color": [],
}
)
self.totals_source = ColumnDataSource(
{
"x": ["Network Send", "Network Receive", "Disk Write", "Disk Read"],
"values": [0, 0, 0, 0],
}
)

self.comm_memory = figure(
title="Comms Buffer",
tools="",
toolbar_location="above",
x_range=Range1d(0, 100_000_000),
**kwargs,
)
self.comm_memory.hbar(
source=self.source,
right="comm_memory",
y="y",
height=0.9,
color="comm_color",
)
hover = HoverTool(
tooltips=[
("Memory Used", "@comm_memory{0.00 b}"),
("Average Write", "@comm_avg_size{0.00 b}"),
("# Buckets", "@comm_buckets"),
("Average Duration", "@comm_avg_duration"),
],
formatters={"@comm_avg_duration": "datetime"},
mode="hline",
)
self.comm_memory.add_tools(hover)
self.comm_memory.x_range.start = 0
self.comm_memory.x_range.end = 1
self.comm_memory.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")

self.disk_memory = figure(
title="Disk Buffer",
tools="",
toolbar_location="above",
x_range=Range1d(0, 100_000_000),
**kwargs,
)
self.disk_memory.yaxis.visible = False

self.disk_memory.hbar(
source=self.source,
right="disk_memory",
y="y",
height=0.9,
color="disk_color",
)

hover = HoverTool(
tooltips=[
("Memory Used", "@disk_memory{0.00 b}"),
("Average Write", "@disk_avg_size{0.00 b}"),
("# Buckets", "@disk_buckets"),
("Average Duration", "@disk_avg_duration"),
],
formatters={"@disk_avg_duration": "datetime"},
mode="hline",
)
self.disk_memory.add_tools(hover)
self.disk_memory.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b")

self.totals = figure(
title="Total movement",
tools="",
toolbar_location="above",
**kwargs,
)
titles = ["Network Send", "Network Receive", "Disk Write", "Disk Read"]
self.totals = figure(
x_range=titles,
title="Totals",
toolbar_location=None,
tools="",
**kwargs,
)

self.totals.vbar(
x="x",
top="values",
width=0.9,
source=self.totals_source,
)

self.totals.xgrid.grid_line_color = None
self.totals.y_range.start = 0
self.totals.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")

hover = HoverTool(
tooltips=[("Total", "@values{0.00b}")],
mode="vline",
)
self.totals.add_tools(hover)

self.root = row(self.comm_memory, self.disk_memory)

@without_property_validation
def update(self):
with log_errors():
input = self.scheduler.extensions["shuffle"].shuffles
if not input:
return

input = list(input.values())[-1] # TODO: multiple concurrent shuffles

data = {
"worker": [],
"y": [],
"comm_memory": [],
"comm_memory_limit": [],
"comm_buckets": [],
"comm_active": [],
"comm_avg_duration": [],
"comm_avg_size": [],
"comm_read": [],
"comm_written": [],
"comm_color": [],
"disk_memory": [],
"disk_memory_limit": [],
"disk_buckets": [],
"disk_active": [],
"disk_avg_duration": [],
"disk_avg_size": [],
"disk_read": [],
"disk_written": [],
"disk_color": [],
}
now = time()

for i, (worker, d) in enumerate(input.items()):
data["y"].append(i)
data["worker"].append(worker)
data["comm_memory"].append(d["comms"]["memory"])
data["comm_memory_limit"].append(d["comms"]["memory_limit"])
data["comm_buckets"].append(d["comms"]["buckets"])
data["comm_active"].append(d["comms"]["active"])
data["comm_avg_duration"].append(
d["comms"]["diagnostics"].get("avg_duration", 0)
)
data["comm_avg_size"].append(
d["comms"]["diagnostics"].get("avg_size", 0)
)
data["comm_read"].append(d["comms"]["read"])
data["comm_written"].append(d["comms"]["written"])
try:
if self.scheduler.workers[worker].last_seen < now - 5:
data["comm_color"].append("gray")
elif d["comms"]["active"]:
data["comm_color"].append("green")
elif d["comms"]["memory"] > d["comms"]["memory_limit"]:
data["comm_color"].append("red")
else:
data["comm_color"].append("blue")
except KeyError:
data["comm_color"].append("black")

data["disk_memory"].append(d["disk"]["memory"])
data["disk_memory_limit"].append(d["disk"]["memory_limit"])
data["disk_buckets"].append(d["disk"]["buckets"])
data["disk_active"].append(d["disk"]["active"])
data["disk_avg_duration"].append(
d["disk"]["diagnostics"].get("avg_duration", 0)
)
data["disk_avg_size"].append(
d["disk"]["diagnostics"].get("avg_size", 0)
)
data["disk_read"].append(d["disk"]["read"])
data["disk_written"].append(d["disk"]["written"])
try:
if self.scheduler.workers[worker].last_seen < now - 5:
data["disk_color"].append("gray")
elif d["disk"]["active"]:
data["disk_color"].append("green")
elif d["disk"]["memory"] > d["disk"]["memory_limit"]:
data["disk_color"].append("red")
else:
data["disk_color"].append("blue")
except KeyError:
data["disk_color"].append("black")

"""
singletons = {
"comm_avg_duration": [
sum(data["comm_avg_duration"]) / len(data["comm_avg_duration"])
],
"comm_avg_size": [
sum(data["comm_avg_size"]) / len(data["comm_avg_size"])
],
"disk_avg_duration": [
sum(data["disk_avg_duration"]) / len(data["disk_avg_duration"])
],
"disk_avg_size": [
sum(data["disk_avg_size"]) / len(data["disk_avg_size"])
],
}
singletons["comm_avg_bandwidth"] = [
singletons["comm_avg_size"][0] / singletons["comm_avg_duration"][0]
]
singletons["disk_avg_bandwidth"] = [
singletons["disk_avg_size"][0] / singletons["disk_avg_duration"][0]
]
singletons["y"] = [data["y"][-1] / 2]
"""

totals = {
"x": ["Network Send", "Network Receive", "Disk Write", "Disk Read"],
"values": [
sum(data["comm_written"]),
sum(data["comm_read"]),
sum(data["disk_written"]),
sum(data["disk_read"]),
],
}
update(self.totals_source, totals)

update(self.source, data)
limit = max(data["comm_memory_limit"] + data["disk_memory_limit"]) * 1.2
self.comm_memory.x_range.end = limit
self.disk_memory.x_range.end = limit


class SchedulerLogs:
def __init__(self, scheduler, start=None):
logs = scheduler.get_logs(start=start, timestamps=True)
Expand Down Expand Up @@ -3531,6 +3788,41 @@ def systemmonitor_doc(scheduler, extra, doc):
doc.theme = BOKEH_THEME


def shuffling_doc(scheduler, extra, doc):
with log_errors():
doc.title = "Dask: Shuffling"

shuffling = Shuffling(scheduler, width=400, height=400)
workers_memory = WorkersMemory(scheduler, width=400, height=400)
timeseries = SystemTimeseries(
scheduler, width=1600, height=200, follow_interval=3000
)
event_loop = EventLoop(scheduler, width=200, height=400)

add_periodic_callback(doc, shuffling, 200)
add_periodic_callback(doc, workers_memory, 200)
add_periodic_callback(doc, timeseries, 500)
add_periodic_callback(doc, event_loop, 500)

timeseries.bandwidth.y_range = timeseries.disk.y_range

doc.add_root(
column(
row(
workers_memory.root,
shuffling.comm_memory,
shuffling.disk_memory,
shuffling.totals,
event_loop.root,
),
row(column(timeseries.bandwidth, timeseries.disk)),
)
)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME


def stealing_doc(scheduler, extra, doc):
with log_errors():
occupancy = Occupancy(scheduler)
Expand Down
2 changes: 2 additions & 0 deletions distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
individual_profile_server_doc,
profile_doc,
profile_server_doc,
shuffling_doc,
status_doc,
stealing_doc,
systemmonitor_doc,
Expand All @@ -49,6 +50,7 @@

applications = {
"/system": systemmonitor_doc,
"/shuffle": shuffling_doc,
"/stealing": stealing_doc,
"/workers": workers_doc,
"/events": events_doc,
Expand Down
Loading