Skip to content

Commit

Permalink
[Serve] Add serve test config files and wrk dependency (ray-project#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaodong authored Jun 28, 2021
1 parent be1f6d5 commit 6aeda62
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 31 deletions.
2 changes: 2 additions & 0 deletions release/long_running_tests/app_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ base_image: "anyscale/ray-ml:1.4.0"
env_vars: {}
debian_packages:
- curl
- unzip

python:
pip_packages:
Expand All @@ -13,3 +14,4 @@ post_build_cmds:
- pip3 install numpy || true
- pip3 install -U {{ env["RAY_WHEELS"] | default("ray") }}
- pip3 install -U ray[all] gym[atari]
- 'rm -r wrk || true && git clone https://github.com/wg/wrk.git /tmp/wrk && cd /tmp/wrk && make -j && sudo cp wrk /usr/local/bin'
86 changes: 67 additions & 19 deletions release/long_running_tests/workloads/serve.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
import re
import time
import subprocess
from subprocess import PIPE
Expand All @@ -10,38 +11,54 @@
from ray import serve
from ray.cluster_utils import Cluster

# Global variables / constants appear only right after imports.
# Ray serve deployment setup constants
NUM_REPLICAS = 7
MAX_BATCH_SIZE = 16

# Cluster setup constants
NUM_REDIS_SHARDS = 1
REDIS_MAX_MEMORY = 10**8
OBJECT_STORE_MEMORY = 10**8
NUM_NODES = 4

# wrk setup constants (might want to make these configurable ?)
NUM_CONNECTIONS = int(NUM_REPLICAS * MAX_BATCH_SIZE * 0.75)
NUM_THREADS = 2
# Append and print every 5mins for quick status polling as well
# as time series plotting
TIME_PER_CYCLE = "5m"


def update_progress(result):
"""
Write test result json to /tmp/, which will be read from
anyscale product runs in each releaser test
"""
result["last_update"] = time.time()
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/release_test_output.json")
with open(test_output_json, "wt") as f:
with open(test_output_json, "at") as f:
json.dump(result, f)
f.write("\n")


num_redis_shards = 1
redis_max_memory = 10**8
object_store_memory = 10**8
num_nodes = 4
cluster = Cluster()
for i in range(num_nodes):
for i in range(NUM_NODES):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=num_redis_shards if i == 0 else None,
num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None,
num_cpus=8,
num_gpus=0,
resources={str(i): 2},
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
object_store_memory=OBJECT_STORE_MEMORY,
redis_max_memory=REDIS_MAX_MEMORY,
dashboard_host="0.0.0.0",
)

ray.init(address=cluster.address, dashboard_host="0.0.0.0")
serve.start()

NUM_REPLICAS = 7
MAX_BATCH_SIZE = 16


@serve.deployment(name="echo", num_replicas=NUM_REPLICAS)
class Echo:
Expand All @@ -62,27 +79,58 @@ async def __call__(self, request):
print(resp)
time.sleep(0.5)

connections = int(NUM_REPLICAS * MAX_BATCH_SIZE * 0.75)
num_threads = 2
time_to_run = "60m"
print("Started load testing with the following config: ")
print(f"num_connections: {NUM_CONNECTIONS}")
print(f"num_threads: {NUM_THREADS}")
print(f"time_per_cycle: {TIME_PER_CYCLE}")

while True:
proc = subprocess.Popen(
[
"wrk",
"-c",
str(connections),
str(NUM_CONNECTIONS),
"-t",
str(num_threads),
str(NUM_THREADS),
"-d",
time_to_run,
TIME_PER_CYCLE,
"http://127.0.0.1:8000/echo",
],
stdout=PIPE,
stderr=PIPE,
)
print("started load testing")
proc.wait()
out, err = proc.communicate()

# Sample wrk stdout:
#
# Running 10s test @ http://127.0.0.1:8000/echo
# 2 threads and 84 connections
# Thread Stats Avg Stdev Max +/- Stdev
# Latency 59.33ms 13.51ms 113.83ms 64.20%
# Req/Sec 709.16 61.73 848.00 78.50%
# 14133 requests in 10.02s, 2.08MB read
# Requests/sec: 1410.71
# Transfer/sec: 212.16KB
metrics_dict = {}
for line in out.decode().splitlines():
parsed = re.split(r"\s+", line.strip())
if parsed[0] == "Latency":
metrics_dict["latency_avg"] = parsed[1]
metrics_dict["latency_stdev"] = parsed[2]
metrics_dict["latency_max"] = parsed[3]
metrics_dict["latency_+/-_stdev"] = parsed[4]
elif parsed[0] == "Req/Sec":
metrics_dict["req/sec_avg"] = parsed[1]
metrics_dict["req/sec_stdev"] = parsed[2]
metrics_dict["req/sec_max"] = parsed[3]
metrics_dict["req/sec_+/-_stdev"] = parsed[4]
elif parsed[0] == "Requests/sec:":
metrics_dict["requests/sec"] = parsed[1]
elif parsed[0] == "Transfer/sec:":
metrics_dict["transfer/sec"] = parsed[1]

print(out.decode())
print(err.decode())

update_progress(metrics_dict)
40 changes: 28 additions & 12 deletions release/long_running_tests/workloads/serve_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,51 @@
from ray import serve
from ray.cluster_utils import Cluster

# Global variables / constants appear only right after imports.
# Ray serve deployment setup constants
NUM_REPLICAS = 7
MAX_BATCH_SIZE = 16

# Cluster setup constants
NUM_REDIS_SHARDS = 1
REDIS_MAX_MEMORY = 10**8
OBJECT_STORE_MEMORY = 10**8
NUM_NODES = 4

# RandomTest setup constants
CPUS_PER_NODE = 10


def update_progress(result):
"""
Write test result json to /tmp/, which will be read from
anyscale product runs in each releaser test
"""
result["last_update"] = time.time()
test_output_json = os.environ.get("TEST_OUTPUT_JSON",
"/tmp/release_test_output.json")
with open(test_output_json, "wt") as f:
json.dump(result, f)


num_redis_shards = 1
redis_max_memory = 10**8
object_store_memory = 10**8
num_nodes = 4
cpus_per_node = 10
cluster = Cluster()
for i in range(num_nodes):
for i in range(NUM_NODES):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_redis_shards=num_redis_shards if i == 0 else None,
num_redis_shards=NUM_REDIS_SHARDS if i == 0 else None,
num_cpus=16,
num_gpus=0,
resources={str(i): 2},
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
object_store_memory=OBJECT_STORE_MEMORY,
redis_max_memory=REDIS_MAX_MEMORY,
dashboard_host="0.0.0.0",
)

ray.init(
address=cluster.address, dashboard_host="0.0.0.0", log_to_driver=False)
namespace="serve_failure_test",
address=cluster.address,
dashboard_host="0.0.0.0",
log_to_driver=False)
serve.start(detached=True)


Expand Down Expand Up @@ -79,7 +95,7 @@ def __init__(self, max_deployments=1):
def create_deployment(self):
if len(self.deployments) == self.max_deployments:
deployment_to_delete = self.deployments.pop()
serve.delete_deployment(deployment_to_delete)
serve.get_deployment(deployment_to_delete).delete()

new_name = "".join(
[random.choice(string.ascii_letters) for _ in range(10)])
Expand Down Expand Up @@ -131,4 +147,4 @@ def run(self):

random_killer = RandomKiller.remote()
random_killer.run.remote()
RandomTest(max_deployments=num_nodes * cpus_per_node).run()
RandomTest(max_deployments=NUM_NODES * CPUS_PER_NODE).run()

0 comments on commit 6aeda62

Please sign in to comment.