Skip to content

Commit

Permalink
[Core] Initial port of Ray to Python 3.13 (ray-project#47984)
Browse files Browse the repository at this point in the history
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

This is the first step towards
ray-project#47933

It is not very tested at the moment (on Python 3.13), but it compiles
locally (with `pip install -e . --verbose`) and can execute a simple
workload like
```
>>> import ray
>>> ray.init()
2024-10-10 16:03:31,857	INFO worker.py:1799 -- Started a local Ray instance.
RayContext(dashboard_url='', python_version='3.13.0', ray_version='3.0.0.dev0', ray_commit='{{RAY_COMMIT_SHA}}')
>>> @ray.remote
... def f():
...     return 42
...     
>>> ray.get(f.remote())
42
>>> 
```
(and similar for actors).

The main thing that needed to change to make Ray work on Python 3.13 was
to upgrade Cython to 3.0.11 which seems to be the first version of
Cython to support Python 3.13. Unfortunately it has a compiler bug
cython/cython#3235 (the fix is not released yet)
that I had to work around.

I also had to work around cython/cython#5750
by changing some typing from `float` to `int | float`.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Philipp Moritz <pcmoritz@gmail.com>
Co-authored-by: pcmoritz <pcmoritz@anyscale.com>
Co-authored-by: srinathk10 <68668616+srinathk10@users.noreply.github.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
  • Loading branch information
4 people authored Feb 13, 2025
1 parent 68c0ead commit d2004b6
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 77 deletions.
4 changes: 2 additions & 2 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ def ray_deps_setup():
auto_http_archive(
name = "cython",
build_file = True,
url = "https://github.com/cython/cython/archive/refs/tags/0.29.37.tar.gz",
sha256 = "824eb14045d85c5af677536134199dd6709db8fb0835452fd2d54bc3c8df8887",
url = "https://github.com/cython/cython/archive/refs/tags/3.0.12.tar.gz",
sha256 = "a156fff948c2013f2c8c398612c018e2b52314fdf0228af8fbdb5585e13699c2",
)

auto_http_archive(
Expand Down
3 changes: 2 additions & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ cdef class CoreWorker:
worker, outputs,
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
CObjectID ref_generator_id=*)
ref_generator_id=*, # CObjectID
)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle,
c_bool weak_ref)
cdef c_function_descriptors_to_python(
Expand Down
20 changes: 12 additions & 8 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class ObjectRefGenerator:

def _next_sync(
self,
timeout_s: Optional[float] = None
timeout_s: Optional[int | float] = None
) -> ObjectRef:
"""Waits for timeout_s and returns the object ref if available.

Expand Down Expand Up @@ -509,7 +509,7 @@ class ObjectRefGenerator:

async def _next_async(
self,
timeout_s: Optional[float] = None
timeout_s: Optional[int | float] = None
):
"""Same API as _next_sync, but it is for async context."""
core_worker = self.worker.core_worker
Expand Down Expand Up @@ -1529,7 +1529,7 @@ cdef create_generator_return_obj(
worker, [output],
caller_address,
&intermediate_result,
generator_id)
generator_id.Binary())

return_object[0] = intermediate_result.back()

Expand Down Expand Up @@ -1657,7 +1657,7 @@ cdef execute_dynamic_generator_and_store_task_outputs(
worker, generator,
caller_address,
dynamic_returns,
generator_id)
generator_id.Binary())
except Exception as error:
is_retryable_error[0] = determine_if_retryable(
should_retry_exceptions,
Expand Down Expand Up @@ -4322,18 +4322,22 @@ cdef class CoreWorker:
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]]
*returns,
CObjectID ref_generator_id=CObjectID.Nil()):
ref_generator_id=None):
cdef:
CObjectID return_id
size_t data_size
shared_ptr[CBuffer] metadata
c_vector[CObjectID] contained_id
int64_t task_output_inlined_bytes
int64_t num_returns = -1
CObjectID c_ref_generator_id = CObjectID.Nil()
shared_ptr[CRayObject] *return_ptr

if ref_generator_id:
c_ref_generator_id = CObjectID.FromBinary(ref_generator_id)

num_outputs_stored = 0
if not ref_generator_id.IsNil():
if not c_ref_generator_id.IsNil():
# The task specified a dynamic number of return values. Determine
# the expected number of return values.
if returns[0].size() > 0:
Expand Down Expand Up @@ -4403,15 +4407,15 @@ cdef class CoreWorker:

if not self.store_task_output(
serialized_object, return_id,
ref_generator_id,
c_ref_generator_id,
data_size, metadata, contained_id, caller_address,
&task_output_inlined_bytes, return_ptr):
# If the object already exists, but we fail to pin the copy, it
# means the existing copy might've gotten evicted. Try to
# create another copy.
self.store_task_output(
serialized_object, return_id,
ref_generator_id,
c_ref_generator_id,
data_size, metadata,
contained_id, caller_address, &task_output_inlined_bytes,
return_ptr)
Expand Down
28 changes: 14 additions & 14 deletions python/ray/includes/gcs_client.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Binding of C++ ray::gcs::GcsClient.
#
# For how async API are implemented, see src/ray/gcs/gcs_client/python_callbacks.h
from asyncio import Future
from typing import List
from typing import List, Sequence
from libcpp.utility cimport move
import concurrent.futures
from ray.includes.common cimport (
Expand Down Expand Up @@ -277,7 +277,7 @@ cdef class InnerGcsClient:
# NodeInfo methods
#############################################################
def check_alive(
self, node_ips: List[bytes], timeout: Optional[float] = None
self, node_ips: List[bytes], timeout: Optional[int | float] = None
) -> List[bool]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand All @@ -290,7 +290,7 @@ cdef class InnerGcsClient:
return raise_or_return(convert_multi_bool(status, move(results)))

def async_check_alive(
self, node_ips: List[bytes], timeout: Optional[float] = None
self, node_ips: List[bytes], timeout: Optional[int | float] = None
) -> Future[List[bool]]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand All @@ -307,7 +307,7 @@ cdef class InnerGcsClient:
return asyncio.wrap_future(fut)

def drain_nodes(
self, node_ids: List[bytes], timeout: Optional[float] = None
self, node_ids: Sequence[bytes], timeout: Optional[int | float] = None
) -> List[bytes]:
"""returns a list of node_ids that are successfully drained."""
cdef:
Expand All @@ -316,14 +316,14 @@ cdef class InnerGcsClient:
c_vector[c_string] results
CRayStatus status
for node_id in node_ids:
c_node_ids.push_back(CNodeID.FromBinary(node_id))
c_node_ids.push_back(<CNodeID>CUniqueID.FromBinary(node_id))
with nogil:
status = self.inner.get().Nodes().DrainNodes(
c_node_ids, timeout_ms, results)
return raise_or_return(convert_multi_str(status, move(results)))

def get_all_node_info(
self, timeout: Optional[float] = None
self, timeout: Optional[int | float] = None
) -> Dict[NodeID, gcs_pb2.GcsNodeInfo]:
cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1
cdef c_vector[CGcsNodeInfo] reply
Expand All @@ -333,7 +333,7 @@ cdef class InnerGcsClient:
return raise_or_return(convert_get_all_node_info(status, move(reply)))

def async_get_all_node_info(
self, node_id: Optional[NodeID] = None, timeout: Optional[float] = None
self, node_id: Optional[NodeID] = None, timeout: Optional[int | float] = None
) -> Future[Dict[NodeID, gcs_pb2.GcsNodeInfo]]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand All @@ -356,7 +356,7 @@ cdef class InnerGcsClient:
# NodeResources methods
#############################################################
def get_all_resource_usage(
self, timeout: Optional[float] = None
self, timeout: Optional[int | float] = None
) -> GetAllResourceUsageReply:
cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1
cdef CGetAllResourceUsageReply c_reply
Expand All @@ -381,7 +381,7 @@ cdef class InnerGcsClient:
actor_id: Optional[ActorID] = None,
job_id: Optional[JobID] = None,
actor_state_name: Optional[str] = None,
timeout: Optional[float] = None
timeout: Optional[int | float] = None
) -> Future[Dict[ActorID, gcs_pb2.ActorTableData]]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand Down Expand Up @@ -409,7 +409,7 @@ cdef class InnerGcsClient:

def async_kill_actor(
self, actor_id: ActorID, c_bool force_kill, c_bool no_restart,
timeout: Optional[float] = None
timeout: Optional[int | float] = None
) -> ConcurrentFuture[None]:
"""
On success: returns None.
Expand Down Expand Up @@ -439,7 +439,7 @@ cdef class InnerGcsClient:
self, *, job_or_submission_id: Optional[str] = None,
skip_submission_job_info_field: bool = False,
skip_is_running_tasks_field: bool = False,
timeout: Optional[float] = None
timeout: Optional[int | float] = None
) -> Dict[JobID, gcs_pb2.JobTableData]:
cdef c_string c_job_or_submission_id
cdef optional[c_string] c_optional_job_or_submission_id = nullopt
Expand All @@ -462,7 +462,7 @@ cdef class InnerGcsClient:
self, *, job_or_submission_id: Optional[str] = None,
skip_submission_job_info_field: bool = False,
skip_is_running_tasks_field: bool = False,
timeout: Optional[float] = None
timeout: Optional[int | float] = None
) -> Future[Dict[JobID, gcs_pb2.JobTableData]]:
cdef:
c_string c_job_or_submission_id
Expand Down Expand Up @@ -507,7 +507,7 @@ cdef class InnerGcsClient:
#############################################################
def request_cluster_resource_constraint(
self,
bundles: c_vector[unordered_map[c_string, double]],
bundles: c_vector[unordered_map[c_string, cython.double]],
count_array: c_vector[int64_t],
timeout_s=None):
cdef:
Expand Down Expand Up @@ -629,7 +629,7 @@ cdef incremented_fut():
cpython.Py_INCREF(fut)
return fut

cdef void assign_and_decrement_fut(result, fut) with gil:
cdef void assign_and_decrement_fut(result, fut) noexcept with gil:
assert isinstance(fut, concurrent.futures.Future)

assert not fut.done()
Expand Down
8 changes: 4 additions & 4 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ cdef class GlobalStateAccessor:

def get_worker_info(self, worker_id):
cdef unique_ptr[c_string] worker_info
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
with nogil:
worker_info = self.inner.get().GetWorkerInfo(cworker_id)
if worker_info:
Expand All @@ -211,14 +211,14 @@ cdef class GlobalStateAccessor:

def get_worker_debugger_port(self, worker_id):
cdef c_uint32_t result
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
with nogil:
result = self.inner.get().GetWorkerDebuggerPort(cworker_id)
return result

def update_worker_debugger_port(self, worker_id, debugger_port):
cdef c_bool result
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef c_uint32_t cdebugger_port = debugger_port
with nogil:
result = self.inner.get().UpdateWorkerDebuggerPort(
Expand All @@ -228,7 +228,7 @@ cdef class GlobalStateAccessor:

def update_worker_num_paused_threads(self, worker_id, num_paused_threads_delta):
cdef c_bool result
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef c_int32_t cnum_paused_threads_delta = num_paused_threads_delta

with nogil:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/object_ref.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ cdef class ObjectRef(BaseID):
pass

cdef CObjectID native(self):
return self.data
return <CObjectID>self.data

def binary(self):
return self.data.Binary()
Expand Down
53 changes: 13 additions & 40 deletions python/ray/includes/unique_ids.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@ from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libc.stdint cimport uint8_t, uint32_t, int64_t

# Note: we removed the staticmethod declarations in
# https://github.com/ray-project/ray/pull/47984 due
# to a compiler bug in Cython 3.0.x -- we should see
# if we can bring them back in Cython 3.1.x if the
# bug is fixed.
cdef extern from "ray/common/id.h" namespace "ray" nogil:
cdef cppclass CBaseID[T]:
@staticmethod
T FromBinary(const c_string &binary)

@staticmethod
T FromHex(const c_string &hex_str)

@staticmethod
const T Nil()

@staticmethod
size_t Size()

size_t Hash() const
c_bool IsNil() const
c_bool operator==(const CBaseID &rhs) const
Expand All @@ -25,7 +18,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
c_string Binary() const
c_string Hex() const

cdef cppclass CUniqueID "ray::UniqueID"(CBaseID):
cdef cppclass CUniqueID "ray::UniqueID"(CBaseID[CUniqueID]):
CUniqueID()

@staticmethod
Expand All @@ -40,13 +33,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
const CUniqueID Nil()

@staticmethod
size_t Size()

cdef cppclass CActorClassID "ray::ActorClassID"(CUniqueID):

@staticmethod
CActorClassID FromBinary(const c_string &binary)
cdef cppclass CActorClassID "ray::ActorClassID"(CBaseID[CActorClassID]):

@staticmethod
CActorClassID FromHex(const c_string &hex_str)
Expand All @@ -71,26 +58,18 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:

CJobID JobId()

cdef cppclass CNodeID "ray::NodeID"(CUniqueID):

@staticmethod
CNodeID FromBinary(const c_string &binary)
cdef cppclass CNodeID "ray::NodeID"(CBaseID[CNodeID]):

@staticmethod
CNodeID FromHex(const c_string &hex_str)

@staticmethod
const CNodeID Nil()

cdef cppclass CConfigID "ray::ConfigID"(CUniqueID):

@staticmethod
CConfigID FromBinary(const c_string &binary)

cdef cppclass CFunctionID "ray::FunctionID"(CUniqueID):
cdef cppclass CConfigID "ray::ConfigID"(CBaseID[CConfigID]):
pass

@staticmethod
CFunctionID FromBinary(const c_string &binary)
cdef cppclass CFunctionID "ray::FunctionID"(CBaseID[CFunctionID]):

@staticmethod
CFunctionID FromHex(const c_string &hex_str)
Expand Down Expand Up @@ -175,10 +154,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:

CTaskID TaskId() const

cdef cppclass CClusterID "ray::ClusterID"(CUniqueID):

@staticmethod
CClusterID FromBinary(const c_string &binary)
cdef cppclass CClusterID "ray::ClusterID"(CBaseID[CClusterID]):

@staticmethod
CClusterID FromHex(const c_string &hex_str)
Expand All @@ -189,10 +165,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
const CClusterID Nil()

cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID):

@staticmethod
CWorkerID FromBinary(const c_string &binary)
cdef cppclass CWorkerID "ray::WorkerID"(CBaseID[CWorkerID]):

@staticmethod
CWorkerID FromHex(const c_string &hex_str)
Expand Down
Loading

0 comments on commit d2004b6

Please sign in to comment.