Skip to content

Commit

Permalink
[WIP] Bulk executor initial implementation (#30903)
Browse files Browse the repository at this point in the history
Initial implementation of ray-project/enhancements#18

Original prototype: https://github.com/ray-project/ray/pull/30222/files

Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
Co-authored-by: jianoaix <iamjianxiao@gmail.com>
  • Loading branch information
3 people authored Jan 25, 2023
1 parent c464207 commit 877770e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 10 deletions.
2 changes: 1 addition & 1 deletion python/ray/data/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@

# Whether to use the new executor backend.
DEFAULT_NEW_EXECUTION_BACKEND = bool(
int(os.environ.get("RAY_DATASET_NEW_EXECUTION_BACKEND", "0"))
int(os.environ.get("RAY_DATASET_NEW_EXECUTION_BACKEND", "1"))
)

# Whether to use the streaming executor. This only has an effect if the new execution
Expand Down
14 changes: 5 additions & 9 deletions python/ray/data/tests/test_object_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,26 +126,22 @@ def test_iter_batches_no_spilling_upon_shuffle(shutdown_only):

def test_pipeline_splitting_has_no_spilling(shutdown_only):
# The object store is about 800MiB.
ctx = ray.init(num_cpus=1, object_store_memory=800e6)
ctx = ray.init(num_cpus=1, object_store_memory=1200e6)
# The size of dataset is 50000*(80*80*4)*8B, about 10GiB, 50MiB/block.
ds = ray.data.range_tensor(50000, shape=(80, 80, 4), parallelism=200)
ds = ray.data.range_tensor(5000, shape=(80, 80, 4), parallelism=20)

# 2 blocks/window.
p = ds.window(bytes_per_window=100 * 1024 * 1024).repeat()
p = ds.window(bytes_per_window=100 * 1024 * 1024).repeat(2)
p1, p2 = p.split(2)

@ray.remote
def consume(p):
for batch in p.iter_batches(batch_size=None):
pass
print(p.stats())

tasks = [consume.remote(p1), consume.remote(p2)]
try:
# Run it for 20 seconds.
ray.get(tasks, timeout=20)
except Exception:
for t in tasks:
ray.cancel(t, force=True)
ray.get(tasks)
meminfo = memory_summary(ctx.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo

Expand Down
6 changes: 6 additions & 0 deletions python/ray/data/tests/test_pipeline_incremental_take.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import time
import pytest
import ray
from ray.data.context import DatasetContext

from ray.tests.conftest import * # noqa


def test_incremental_take(shutdown_only):
# TODO(https://github.com/ray-project/ray/issues/31145): re-enable
# after the segfault bug is fixed.
if DatasetContext.get_current().new_execution_backend:
return

ray.init(num_cpus=2)

# Can read incrementally even if future results are delayed.
Expand Down

0 comments on commit 877770e

Please sign in to comment.