Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Initial port of Ray to Python 3.13 #47984

Merged
merged 24 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ def ray_deps_setup():
auto_http_archive(
name = "cython",
build_file = True,
url = "https://github.com/cython/cython/archive/refs/tags/0.29.37.tar.gz",
sha256 = "824eb14045d85c5af677536134199dd6709db8fb0835452fd2d54bc3c8df8887",
url = "https://github.com/cython/cython/archive/refs/tags/3.0.12.tar.gz",
sha256 = "a156fff948c2013f2c8c398612c018e2b52314fdf0228af8fbdb5585e13699c2",
)

auto_http_archive(
Expand Down
3 changes: 2 additions & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ cdef class CoreWorker:
worker, outputs,
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
CObjectID ref_generator_id=*)
ref_generator_id=*, # CObjectID
)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle,
c_bool weak_ref)
cdef c_function_descriptors_to_python(
Expand Down
20 changes: 12 additions & 8 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class ObjectRefGenerator:

def _next_sync(
self,
timeout_s: Optional[float] = None
timeout_s: Optional[int | float] = None
) -> ObjectRef:
"""Waits for timeout_s and returns the object ref if available.

Expand Down Expand Up @@ -509,7 +509,7 @@ class ObjectRefGenerator:

async def _next_async(
self,
timeout_s: Optional[float] = None
timeout_s: Optional[int | float] = None
):
"""Same API as _next_sync, but it is for async context."""
core_worker = self.worker.core_worker
Expand Down Expand Up @@ -1529,7 +1529,7 @@ cdef create_generator_return_obj(
worker, [output],
caller_address,
&intermediate_result,
generator_id)
generator_id.Binary())

return_object[0] = intermediate_result.back()

Expand Down Expand Up @@ -1657,7 +1657,7 @@ cdef execute_dynamic_generator_and_store_task_outputs(
worker, generator,
caller_address,
dynamic_returns,
generator_id)
generator_id.Binary())
except Exception as error:
is_retryable_error[0] = determine_if_retryable(
should_retry_exceptions,
Expand Down Expand Up @@ -4322,18 +4322,22 @@ cdef class CoreWorker:
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]]
*returns,
CObjectID ref_generator_id=CObjectID.Nil()):
ref_generator_id=None):
cdef:
CObjectID return_id
size_t data_size
shared_ptr[CBuffer] metadata
c_vector[CObjectID] contained_id
int64_t task_output_inlined_bytes
int64_t num_returns = -1
CObjectID c_ref_generator_id = CObjectID.Nil()
shared_ptr[CRayObject] *return_ptr

if ref_generator_id:
c_ref_generator_id = CObjectID.FromBinary(ref_generator_id)

num_outputs_stored = 0
if not ref_generator_id.IsNil():
if not c_ref_generator_id.IsNil():
# The task specified a dynamic number of return values. Determine
# the expected number of return values.
if returns[0].size() > 0:
Expand Down Expand Up @@ -4403,15 +4407,15 @@ cdef class CoreWorker:

if not self.store_task_output(
serialized_object, return_id,
ref_generator_id,
c_ref_generator_id,
data_size, metadata, contained_id, caller_address,
&task_output_inlined_bytes, return_ptr):
# If the object already exists, but we fail to pin the copy, it
# means the existing copy might've gotten evicted. Try to
# create another copy.
self.store_task_output(
serialized_object, return_id,
ref_generator_id,
c_ref_generator_id,
data_size, metadata,
contained_id, caller_address, &task_output_inlined_bytes,
return_ptr)
Expand Down
28 changes: 14 additions & 14 deletions python/ray/includes/gcs_client.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Binding of C++ ray::gcs::GcsClient.
#
# For how async API are implemented, see src/ray/gcs/gcs_client/python_callbacks.h
from asyncio import Future
from typing import List
from typing import List, Sequence
from libcpp.utility cimport move
import concurrent.futures
from ray.includes.common cimport (
Expand Down Expand Up @@ -277,7 +277,7 @@ cdef class InnerGcsClient:
# NodeInfo methods
#############################################################
def check_alive(
self, node_ips: List[bytes], timeout: Optional[float] = None
self, node_ips: List[bytes], timeout: Optional[int | float] = None
) -> List[bool]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand All @@ -290,7 +290,7 @@ cdef class InnerGcsClient:
return raise_or_return(convert_multi_bool(status, move(results)))

def async_check_alive(
self, node_ips: List[bytes], timeout: Optional[float] = None
self, node_ips: List[bytes], timeout: Optional[int | float] = None
) -> Future[List[bool]]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand All @@ -307,7 +307,7 @@ cdef class InnerGcsClient:
return asyncio.wrap_future(fut)

def drain_nodes(
self, node_ids: List[bytes], timeout: Optional[float] = None
self, node_ids: Sequence[bytes], timeout: Optional[int | float] = None
) -> List[bytes]:
"""returns a list of node_ids that are successfully drained."""
cdef:
Expand All @@ -316,14 +316,14 @@ cdef class InnerGcsClient:
c_vector[c_string] results
CRayStatus status
for node_id in node_ids:
c_node_ids.push_back(CNodeID.FromBinary(node_id))
c_node_ids.push_back(<CNodeID>CUniqueID.FromBinary(node_id))
with nogil:
status = self.inner.get().Nodes().DrainNodes(
c_node_ids, timeout_ms, results)
return raise_or_return(convert_multi_str(status, move(results)))

def get_all_node_info(
self, timeout: Optional[float] = None
self, timeout: Optional[int | float] = None
) -> Dict[NodeID, gcs_pb2.GcsNodeInfo]:
cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1
cdef c_vector[CGcsNodeInfo] reply
Expand All @@ -333,7 +333,7 @@ cdef class InnerGcsClient:
return raise_or_return(convert_get_all_node_info(status, move(reply)))

def async_get_all_node_info(
self, node_id: Optional[NodeID] = None, timeout: Optional[float] = None
self, node_id: Optional[NodeID] = None, timeout: Optional[int | float] = None
) -> Future[Dict[NodeID, gcs_pb2.GcsNodeInfo]]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand All @@ -356,7 +356,7 @@ cdef class InnerGcsClient:
# NodeResources methods
#############################################################
def get_all_resource_usage(
self, timeout: Optional[float] = None
self, timeout: Optional[int | float] = None
) -> GetAllResourceUsageReply:
cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1
cdef CGetAllResourceUsageReply c_reply
Expand All @@ -381,7 +381,7 @@ cdef class InnerGcsClient:
actor_id: Optional[ActorID] = None,
job_id: Optional[JobID] = None,
actor_state_name: Optional[str] = None,
timeout: Optional[float] = None
timeout: Optional[int | float] = None
) -> Future[Dict[ActorID, gcs_pb2.ActorTableData]]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand Down Expand Up @@ -409,7 +409,7 @@ cdef class InnerGcsClient:

def async_kill_actor(
self, actor_id: ActorID, c_bool force_kill, c_bool no_restart,
timeout: Optional[float] = None
timeout: Optional[int | float] = None
) -> ConcurrentFuture[None]:
"""
On success: returns None.
Expand Down Expand Up @@ -439,7 +439,7 @@ cdef class InnerGcsClient:
self, *, job_or_submission_id: Optional[str] = None,
skip_submission_job_info_field: bool = False,
skip_is_running_tasks_field: bool = False,
timeout: Optional[float] = None
timeout: Optional[int | float] = None
) -> Dict[JobID, gcs_pb2.JobTableData]:
cdef c_string c_job_or_submission_id
cdef optional[c_string] c_optional_job_or_submission_id = nullopt
Expand All @@ -462,7 +462,7 @@ cdef class InnerGcsClient:
self, *, job_or_submission_id: Optional[str] = None,
skip_submission_job_info_field: bool = False,
skip_is_running_tasks_field: bool = False,
timeout: Optional[float] = None
timeout: Optional[int | float] = None
) -> Future[Dict[JobID, gcs_pb2.JobTableData]]:
cdef:
c_string c_job_or_submission_id
Expand Down Expand Up @@ -507,7 +507,7 @@ cdef class InnerGcsClient:
#############################################################
def request_cluster_resource_constraint(
self,
bundles: c_vector[unordered_map[c_string, double]],
bundles: c_vector[unordered_map[c_string, cython.double]],
count_array: c_vector[int64_t],
timeout_s=None):
cdef:
Expand Down Expand Up @@ -629,7 +629,7 @@ cdef incremented_fut():
cpython.Py_INCREF(fut)
return fut

cdef void assign_and_decrement_fut(result, fut) with gil:
cdef void assign_and_decrement_fut(result, fut) noexcept with gil:
assert isinstance(fut, concurrent.futures.Future)

assert not fut.done()
Expand Down
8 changes: 4 additions & 4 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ cdef class GlobalStateAccessor:

def get_worker_info(self, worker_id):
cdef unique_ptr[c_string] worker_info
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
with nogil:
worker_info = self.inner.get().GetWorkerInfo(cworker_id)
if worker_info:
Expand All @@ -211,14 +211,14 @@ cdef class GlobalStateAccessor:

def get_worker_debugger_port(self, worker_id):
cdef c_uint32_t result
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
with nogil:
result = self.inner.get().GetWorkerDebuggerPort(cworker_id)
return result

def update_worker_debugger_port(self, worker_id, debugger_port):
cdef c_bool result
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef c_uint32_t cdebugger_port = debugger_port
with nogil:
result = self.inner.get().UpdateWorkerDebuggerPort(
Expand All @@ -228,7 +228,7 @@ cdef class GlobalStateAccessor:

def update_worker_num_paused_threads(self, worker_id, num_paused_threads_delta):
cdef c_bool result
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef c_int32_t cnum_paused_threads_delta = num_paused_threads_delta

with nogil:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/object_ref.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ cdef class ObjectRef(BaseID):
pass

cdef CObjectID native(self):
return self.data
return <CObjectID>self.data

def binary(self):
return self.data.Binary()
Expand Down
53 changes: 13 additions & 40 deletions python/ray/includes/unique_ids.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@ from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libc.stdint cimport uint8_t, uint32_t, int64_t

# Note: we removed the staticmethod declarations in
# https://github.com/ray-project/ray/pull/47984 due
# to a compiler bug in Cython 3.0.x -- we should see
# if we can bring them back in Cython 3.1.x if the
# bug is fixed.
cdef extern from "ray/common/id.h" namespace "ray" nogil:
cdef cppclass CBaseID[T]:
@staticmethod
T FromBinary(const c_string &binary)

@staticmethod
T FromHex(const c_string &hex_str)

@staticmethod
const T Nil()

@staticmethod
size_t Size()

size_t Hash() const
c_bool IsNil() const
c_bool operator==(const CBaseID &rhs) const
Expand All @@ -25,7 +18,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
c_string Binary() const
c_string Hex() const

cdef cppclass CUniqueID "ray::UniqueID"(CBaseID):
cdef cppclass CUniqueID "ray::UniqueID"(CBaseID[CUniqueID]):
CUniqueID()

@staticmethod
Expand All @@ -40,13 +33,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
const CUniqueID Nil()

@staticmethod
size_t Size()

cdef cppclass CActorClassID "ray::ActorClassID"(CUniqueID):

@staticmethod
CActorClassID FromBinary(const c_string &binary)
cdef cppclass CActorClassID "ray::ActorClassID"(CBaseID[CActorClassID]):

@staticmethod
CActorClassID FromHex(const c_string &hex_str)
Expand All @@ -71,26 +58,18 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:

CJobID JobId()

cdef cppclass CNodeID "ray::NodeID"(CUniqueID):

@staticmethod
CNodeID FromBinary(const c_string &binary)
cdef cppclass CNodeID "ray::NodeID"(CBaseID[CNodeID]):

@staticmethod
CNodeID FromHex(const c_string &hex_str)

@staticmethod
const CNodeID Nil()

cdef cppclass CConfigID "ray::ConfigID"(CUniqueID):

@staticmethod
CConfigID FromBinary(const c_string &binary)

cdef cppclass CFunctionID "ray::FunctionID"(CUniqueID):
cdef cppclass CConfigID "ray::ConfigID"(CBaseID[CConfigID]):
pass

@staticmethod
CFunctionID FromBinary(const c_string &binary)
cdef cppclass CFunctionID "ray::FunctionID"(CBaseID[CFunctionID]):

@staticmethod
CFunctionID FromHex(const c_string &hex_str)
Expand Down Expand Up @@ -175,10 +154,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:

CTaskID TaskId() const

cdef cppclass CClusterID "ray::ClusterID"(CUniqueID):

@staticmethod
CClusterID FromBinary(const c_string &binary)
cdef cppclass CClusterID "ray::ClusterID"(CBaseID[CClusterID]):

@staticmethod
CClusterID FromHex(const c_string &hex_str)
Expand All @@ -189,10 +165,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
const CClusterID Nil()

cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID):

@staticmethod
CWorkerID FromBinary(const c_string &binary)
cdef cppclass CWorkerID "ray::WorkerID"(CBaseID[CWorkerID]):

@staticmethod
CWorkerID FromHex(const c_string &hex_str)
Expand Down
Loading