Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transfer]update ray version and use internal core worker api #49

Merged
merged 2 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ubuntu_building.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
python3 -m virtualenv -p python3 py3
. py3/bin/activate
pip install pytest
pip install https://ray-mobius-us.oss-us-west-1.aliyuncs.com/ci/linux/ubuntu/0bb82f29b65dca348acf5aa516d21ef3f176a3e1/ray-2.0.0.dev0-cp38-cp38-linux_x86_64.whl
sh -c "bash scripts/install-ray.sh"
- name: Streaming python test
run: sh -c "bash streaming/buildtest.sh --test_categories=streaming_python"

Expand Down
2 changes: 1 addition & 1 deletion scripts/install-ray.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
minor_version=`python -c "import sys;print(sys.version_info[1])"`

if [ "$minor_version" == "8" ] ; then
python -m pip install https://ray-mobius-us.oss-us-west-1.aliyuncs.com/ci/linux/ubuntu/0bb82f29b65dca348acf5aa516d21ef3f176a3e1/ray-2.0.0.dev0-cp38-cp38-linux_x86_64.whl
python -m pip install https://ray-mobius-us.oss-us-west-1.aliyuncs.com/ci/linux/ubuntu/b7d148815e427ccd47046d1c69ee3ab55bf7db3c/ray-2.0.0.dev0-cp38-cp38-linux_x86_64.whl
elif [ "$minor_version" == "7" ] ; then
python -m pip install https://ray-mobius-us.oss-us-west-1.aliyuncs.com/ci/linux/ubuntu/0bb82f29b65dca348acf5aa516d21ef3f176a3e1/ray-2.0.0.dev0-cp37-cp37m-linux_x86_64.whl
elif [ "$minor_version" == "6"] ; then
Expand Down
4 changes: 2 additions & 2 deletions streaming/WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ workspace(name = "com_github_ray_streaming")

# LOAD RAY WORKSPACE
load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive", "http_file")
ray_version = "0bb82f29b65dca348acf5aa516d21ef3f176a3e1"
ray_version = "b7d148815e427ccd47046d1c69ee3ab55bf7db3c"
http_archive(
name="com_github_ray_project_ray",
strip_prefix = "ray-{}".format(ray_version),
urls = ["https://github.com/ray-project/ray/archive/{}.zip".format(ray_version)],
sha256 = "56b0bf6f0d5506be4bff694e9a3bedc6c8f3373b0ecd9f20fc2a25aed5efc569",
sha256 = "5de3e76a6208c4fc56f7f8a80275705d47b59124947865aa90b1ada96432c522",
)

http_archive(
Expand Down
8 changes: 5 additions & 3 deletions streaming/src/event_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <chrono>
#include <unordered_set>

#include "ray/internal/internal.h"

namespace ray {
namespace streaming {

Expand Down Expand Up @@ -117,7 +119,7 @@ Event &EventQueue::Front() {
}

EventService::EventService(uint32_t event_size)
: worker_id_(CoreWorkerProcess::IsInitialized()
: worker_id_(ray::internal::IsInitialized()
? CoreWorkerProcess::GetCoreWorker().GetWorkerID()
: WorkerID::Nil()),
event_queue_(std::make_shared<EventQueue>(event_size)),
Expand Down Expand Up @@ -170,8 +172,8 @@ void EventService::Execute(Event &event) {
}

void EventService::LoopThreadHandler() {
if (CoreWorkerProcess::IsInitialized()) {
CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id_);
if (ray::internal::IsInitialized()) {
ray::internal::SetCurrentThreadWorker(worker_id_);
}
while (true) {
if (stop_flag_) {
Expand Down
6 changes: 4 additions & 2 deletions streaming/src/queue/queue_client.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include "ray/internal/internal.h"

#include "queue/queue_handler.h"
#include "queue/transport.h"

Expand All @@ -19,7 +21,7 @@ class ReaderClient {
/// function descriptor to be called by DataWriter, synchronous semantics
ReaderClient() {
downstream_handler_ = ray::streaming::DownstreamQueueMessageHandler::CreateService(
CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID());
ray::internal::GetCurrentActorID());
}

/// Post buffer to downstream queue service, asynchronously.
Expand All @@ -38,7 +40,7 @@ class WriterClient {
public:
WriterClient() {
upstream_handler_ = ray::streaming::UpstreamQueueMessageHandler::CreateService(
CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID());
ray::internal::GetCurrentActorID());
}

void OnWriterMessage(std::shared_ptr<LocalMemoryBuffer> buffer);
Expand Down