diff --git a/python/ray/ray_perf.py b/python/ray/ray_perf.py index bcda187926ba..eae62e396fd3 100644 --- a/python/ray/ray_perf.py +++ b/python/ray/ray_perf.py @@ -1,5 +1,6 @@ """This is the script for `ray microbenchmark`.""" +import asyncio import os import time import numpy as np @@ -22,6 +23,18 @@ def small_value_batch(self, n): ray.get([small_value.remote() for _ in range(n)]) +@ray.remote +class AsyncActor: + async def small_value(self): + return b"ok" + + async def small_value_with_arg(self, x): + return b"ok" + + async def small_value_batch(self, n): + await asyncio.wait([small_value.remote() for _ in range(n)]) + + @ray.remote(num_cpus=0) class Client: def __init__(self, servers): @@ -190,6 +203,51 @@ def actor_multi2_direct_arg(): timeit("n:n actor calls with arg async", actor_multi2_direct_arg, n * len(clients)) + a = AsyncActor.remote() + + def actor_sync(): + ray.get(a.small_value.remote()) + + timeit("1:1 async-actor calls sync", actor_sync) + + a = AsyncActor.remote() + + def async_actor(): + ray.get([a.small_value.remote() for _ in range(1000)]) + + timeit("1:1 async-actor calls async", async_actor, 1000) + + a = AsyncActor.remote() + + def async_actor(): + ray.get([a.small_value_with_arg.remote(i) for i in range(1000)]) + + timeit("1:1 async-actor calls with args async", async_actor, 1000) + + n = 5000 + n_cpu = multiprocessing.cpu_count() // 2 + actors = [AsyncActor.remote() for _ in range(n_cpu)] + client = Client.remote(actors) + + def async_actor_async(): + ray.get(client.small_value_batch.remote(n)) + + timeit("1:n async-actor calls async", async_actor_async, n * len(actors)) + + n = 5000 + m = 4 + n_cpu = multiprocessing.cpu_count() // 2 + a = [AsyncActor.remote() for _ in range(n_cpu)] + + @ray.remote + def async_actor_work(actors): + ray.get([actors[i % n_cpu].small_value.remote() for i in range(n)]) + + def async_actor_multi(): + ray.get([async_actor_work.remote(a) for _ in range(m)]) + + timeit("n:n async-actor calls async", async_actor_multi, m * n) + if __name__ == "__main__": main()