From 517efd57fa35f6f47fc92aa6d0963a3b74809f64 Mon Sep 17 00:00:00 2001 From: Richard Date: Thu, 9 Mar 2017 01:14:34 -0800 Subject: [PATCH 1/4] backend ui work --- webui/backend/ray_ui.py | 43 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index e28815256704..0d67921ed148 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -50,6 +50,7 @@ async def hgetall_as_dict(redis_conn, key): # Cache information about the local schedulers. local_schedulers = {} +errors = collections.defaultdict(list) def duration_to_string(duration): """Format a duration in seconds as a string. @@ -125,6 +126,28 @@ async def handle_get_drivers(websocket, redis_conn): reply = sorted(drivers, key=(lambda driver: driver["start time"]))[::-1] await websocket.send(json.dumps(reply)) +async def listen_for_errors(redis_ip_address, redis_port): + pubsub_conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop) + data_conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop) + + error_pattern = "__keyspace@0__:ErrorKeys" + psub = await pubsub_conn.execute_pubsub("psubscribe", error_pattern) + channel = pubsub_conn.pubsub_patterns[error_pattern] + print("Listening for error messages...") + while (await channel.wait_message()): + info = await data_conn.execute("lrange", "ErrorKeys", 0, -1) + for error_key in info: + worker, task = key_to_hex_identifiers(error_key) + # TODO: Filter out workers so that only relevant task errors are necessary + result = await data_conn.execute("hget", error_key, "message") + result = result.decode("ascii") + # TODO: Maybe also get rid of coloring? + errors[worker].append(result) + +async def handle_get_errors(websocket): + """Renders error messages""" + await websocket.send(json.dumps(errors)) + node_info = collections.OrderedDict() worker_info = collections.OrderedDict() @@ -245,12 +268,20 @@ async def send_heartbeats(websocket, redis_conn): continue local_schedulers[local_scheduler_id]["last_heartbeat"] = time.time() -async def serve_requests(websocket, path): - redis_conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop) +async def initialize(redis_ip_address, redis_port): + """Open up ports to listen for new updates from Redis""" + # TODO(richard): A lot of code needs to be ported in order to open + # new websockets. + + asyncio.ensure_future(listen_for_errors(redis_ip_address, redis_port)) + + +async def serve_requests(websocket, path): # We loop infinitely because otherwise the websocket will be closed. # TODO(rkn): Maybe we should open a new web sockets for every request instead # of looping here. + redis_conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop) while True: command = json.loads(await websocket.recv()) print("received command {}".format(command)) @@ -261,6 +292,8 @@ async def serve_requests(websocket, path): await handle_get_drivers(websocket, redis_conn) elif command["command"] == "get-recent-tasks": await handle_get_recent_tasks(websocket, redis_conn, command["num"]) + elif command["command"] == "get-errors": + await handle_get_errors(websocket) elif command["command"] == "get-heartbeats": await send_heartbeats(websocket, redis_conn) @@ -336,8 +369,10 @@ async def serve_requests(websocket, path): # The port here must match the value used by the frontend to connect over # websockets. - port = 8888 - start_server = websockets.serve(serve_requests, "localhost", port) + port = 8888 #TODO: Automatically incrementation of port if taken already + loop.run_until_complete(initialize(redis_ip_address, redis_port)) + + start_server = websockets.serve(serve_requests, "localhost", port) loop.run_until_complete(start_server) loop.run_forever() From 18c8e53b3ee2888fd0fb541cfe614f99f6d69b2d Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 10 Mar 2017 16:07:01 -0800 Subject: [PATCH 2/4] instll block --- webui/backend/ray_ui.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index 0d67921ed148..202b41976217 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -134,8 +134,11 @@ async def listen_for_errors(redis_ip_address, redis_port): psub = await pubsub_conn.execute_pubsub("psubscribe", error_pattern) channel = pubsub_conn.pubsub_patterns[error_pattern] print("Listening for error messages...") + index = 0 while (await channel.wait_message()): - info = await data_conn.execute("lrange", "ErrorKeys", 0, -1) + msg = await channel.get() # instills block??? + info = await data_conn.execute("lrange", "ErrorKeys", index, -1) + for error_key in info: worker, task = key_to_hex_identifiers(error_key) # TODO: Filter out workers so that only relevant task errors are necessary @@ -143,6 +146,7 @@ async def listen_for_errors(redis_ip_address, redis_port): result = result.decode("ascii") # TODO: Maybe also get rid of coloring? errors[worker].append(result) + index += 1 async def handle_get_errors(websocket): """Renders error messages""" From 211c747c7c7131a52dec164e7f20a3d52b602aa5 Mon Sep 17 00:00:00 2001 From: Richard Date: Fri, 10 Mar 2017 18:00:42 -0800 Subject: [PATCH 3/4] easy mvp --- webui/backend/ray_ui.py | 8 +++--- webui/src/ray-app.html | 2 ++ webui/src/ray-errors.html | 56 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 webui/src/ray-errors.html diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index 202b41976217..2e84d1f1fd7f 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -50,7 +50,7 @@ async def hgetall_as_dict(redis_conn, key): # Cache information about the local schedulers. local_schedulers = {} -errors = collections.defaultdict(list) +errors = [] def duration_to_string(duration): """Format a duration in seconds as a string. @@ -145,11 +145,13 @@ async def listen_for_errors(redis_ip_address, redis_port): result = await data_conn.execute("hget", error_key, "message") result = result.decode("ascii") # TODO: Maybe also get rid of coloring? - errors[worker].append(result) + errors.append({"worker_id": worker, + "task_id": task, + "error": result}) index += 1 async def handle_get_errors(websocket): - """Renders error messages""" + """Renders error messages""" await websocket.send(json.dumps(errors)) node_info = collections.OrderedDict() diff --git a/webui/src/ray-app.html b/webui/src/ray-app.html index 60a748b861f1..8daf6dbcf0f5 100644 --- a/webui/src/ray-app.html +++ b/webui/src/ray-app.html @@ -75,6 +75,7 @@ Objects Tasks Events + Errors Timeline Recent Tasks @@ -100,6 +101,7 @@ + diff --git a/webui/src/ray-errors.html b/webui/src/ray-errors.html new file mode 100644 index 000000000000..c057424fb11d --- /dev/null +++ b/webui/src/ray-errors.html @@ -0,0 +1,56 @@ + + + + + + + + + + From 20c0fde0875b964f6806edd4b4896fbddd0b258a Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 11 Mar 2017 14:55:57 -0800 Subject: [PATCH 4/4] Small changes. --- webui/backend/ray_ui.py | 28 ++++++++++++++-------------- webui/index.html | 10 ---------- webui/src/ray-app.html | 10 ---------- webui/src/ray-errors.html | 12 +----------- webui/src/ray-events.html | 10 ---------- webui/src/ray-icons.html | 12 +----------- webui/src/ray-objects.html | 10 ---------- webui/src/ray-overview.html | 10 ---------- webui/src/ray-tasks.html | 10 ---------- webui/src/ray-timeline.html | 10 ---------- 10 files changed, 16 insertions(+), 106 deletions(-) diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index 2e84d1f1fd7f..b8c107551f99 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -136,22 +136,23 @@ async def listen_for_errors(redis_ip_address, redis_port): print("Listening for error messages...") index = 0 while (await channel.wait_message()): - msg = await channel.get() # instills block??? + msg = await channel.get() info = await data_conn.execute("lrange", "ErrorKeys", index, -1) for error_key in info: worker, task = key_to_hex_identifiers(error_key) - # TODO: Filter out workers so that only relevant task errors are necessary + # TODO(richard): Filter out workers so that only relevant task errors are + # necessary. result = await data_conn.execute("hget", error_key, "message") result = result.decode("ascii") - # TODO: Maybe also get rid of coloring? - errors.append({"worker_id": worker, + # TODO(richard): Maybe also get rid of the coloring. + errors.append({"driver_id": worker, "task_id": task, "error": result}) index += 1 async def handle_get_errors(websocket): - """Renders error messages""" + """Send error messages to the frontend.""" await websocket.send(json.dumps(errors)) node_info = collections.OrderedDict() @@ -274,15 +275,13 @@ async def send_heartbeats(websocket, redis_conn): continue local_schedulers[local_scheduler_id]["last_heartbeat"] = time.time() -async def initialize(redis_ip_address, redis_port): - """Open up ports to listen for new updates from Redis""" - # TODO(richard): A lot of code needs to be ported in order to open - # new websockets. +async def cache_data_from_redis(redis_ip_address, redis_port): + """Open up ports to listen for new updates from Redis.""" + # TODO(richard): A lot of code needs to be ported in order to open new + # websockets. asyncio.ensure_future(listen_for_errors(redis_ip_address, redis_port)) - - async def serve_requests(websocket, path): # We loop infinitely because otherwise the websocket will be closed. # TODO(rkn): Maybe we should open a new web sockets for every request instead @@ -374,10 +373,11 @@ async def serve_requests(websocket, path): redis_ip_address, redis_port = redis_address[0], int(redis_address[1]) # The port here must match the value used by the frontend to connect over - # websockets. - port = 8888 #TODO: Automatically incrementation of port if taken already + # websockets. TODO(richard): Automatically increment the port if it is already + # taken. + port = 8888 - loop.run_until_complete(initialize(redis_ip_address, redis_port)) + loop.run_until_complete(cache_data_from_redis(redis_ip_address, redis_port)) start_server = websockets.serve(serve_requests, "localhost", port) loop.run_until_complete(start_server) diff --git a/webui/index.html b/webui/index.html index 97d1ed5fed21..5ff9cdc8c3ba 100644 --- a/webui/index.html +++ b/webui/index.html @@ -1,13 +1,3 @@ - - diff --git a/webui/src/ray-app.html b/webui/src/ray-app.html index 8daf6dbcf0f5..83e94e75e374 100644 --- a/webui/src/ray-app.html +++ b/webui/src/ray-app.html @@ -1,13 +1,3 @@ - - diff --git a/webui/src/ray-errors.html b/webui/src/ray-errors.html index c057424fb11d..c671b4bebd33 100644 --- a/webui/src/ray-errors.html +++ b/webui/src/ray-errors.html @@ -1,13 +1,3 @@ - - @@ -26,7 +16,7 @@

Errors

- + diff --git a/webui/src/ray-events.html b/webui/src/ray-events.html index 454ca92413ba..17ebece1ebf7 100644 --- a/webui/src/ray-events.html +++ b/webui/src/ray-events.html @@ -1,13 +1,3 @@ - - diff --git a/webui/src/ray-icons.html b/webui/src/ray-icons.html index fcd758c80269..8b93b1bc2798 100644 --- a/webui/src/ray-icons.html +++ b/webui/src/ray-icons.html @@ -1,13 +1,3 @@ - - @@ -28,4 +18,4 @@ - \ No newline at end of file + diff --git a/webui/src/ray-objects.html b/webui/src/ray-objects.html index d7581a13d3f3..47ee968ea214 100644 --- a/webui/src/ray-objects.html +++ b/webui/src/ray-objects.html @@ -1,13 +1,3 @@ - - diff --git a/webui/src/ray-overview.html b/webui/src/ray-overview.html index 4362afb3ac76..5df612c70935 100644 --- a/webui/src/ray-overview.html +++ b/webui/src/ray-overview.html @@ -1,13 +1,3 @@ - - diff --git a/webui/src/ray-tasks.html b/webui/src/ray-tasks.html index 0703f7060909..4c8c9a0a0027 100644 --- a/webui/src/ray-tasks.html +++ b/webui/src/ray-tasks.html @@ -1,13 +1,3 @@ - - diff --git a/webui/src/ray-timeline.html b/webui/src/ray-timeline.html index 5ad8b7b984e7..49375a10861a 100644 --- a/webui/src/ray-timeline.html +++ b/webui/src/ray-timeline.html @@ -1,13 +1,3 @@ - -