Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Initial port of Ray to Python 3.13 #47984

Merged
merged 24 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ def ray_deps_setup():
auto_http_archive(
name = "cython",
build_file = True,
url = "https://github.com/cython/cython/archive/refs/tags/0.29.37.tar.gz",
sha256 = "824eb14045d85c5af677536134199dd6709db8fb0835452fd2d54bc3c8df8887",
url = "https://github.com/cython/cython/archive/refs/tags/3.0.11.tar.gz",
sha256 = "2ec7d66d23d6da2328fb24f5c1bec6c63a59ec2e91027766ab904f417e1078aa",
)

auto_http_archive(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ cdef class CoreWorker:
worker, outputs,
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
CObjectID ref_generator_id=*)
ref_generator_id=*)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle,
c_bool weak_ref)
cdef c_function_descriptors_to_python(
Expand Down
44 changes: 14 additions & 30 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ include "includes/metric.pxi"

# Expose GCC & Clang macro to report
# whether C++ optimizations were enabled during compilation.
OPTIMIZED = __OPTIMIZE__
# OPTIMIZED = __OPTIMIZE__
OPTIMIZED = True

GRPC_STATUS_CODE_UNAVAILABLE = CGrpcStatusCode.UNAVAILABLE
GRPC_STATUS_CODE_UNKNOWN = CGrpcStatusCode.UNKNOWN
Expand Down Expand Up @@ -635,28 +636,7 @@ def compute_task_id(ObjectRef object_ref):

cdef increase_recursion_limit():
"""Double the recusion limit if current depth is close to the limit"""
cdef:
CPyThreadState * s = <CPyThreadState *> PyThreadState_Get()
int current_limit = Py_GetRecursionLimit()
int new_limit = current_limit * 2
cdef extern from *:
"""
#if PY_VERSION_HEX >= 0x30C0000
#define CURRENT_DEPTH(x) ((x)->py_recursion_limit - (x)->py_recursion_remaining)
#elif PY_VERSION_HEX >= 0x30B00A4
#define CURRENT_DEPTH(x) ((x)->recursion_limit - (x)->recursion_remaining)
#else
#define CURRENT_DEPTH(x) ((x)->recursion_depth)
#endif
"""
int CURRENT_DEPTH(CPyThreadState *x)

int current_depth = CURRENT_DEPTH(s)
if current_limit - current_depth < 500:
Py_SetRecursionLimit(new_limit)
logger.debug("Increasing Python recursion limit to {} "
"current recursion depth is {}.".format(
new_limit, current_depth))
pass


cdef CObjectLocationPtrToDict(CObjectLocation* c_object_location):
Expand Down Expand Up @@ -1509,7 +1489,7 @@ cdef create_generator_return_obj(
worker, [output],
caller_address,
&intermediate_result,
generator_id)
generator_id.Binary())

return_object[0] = intermediate_result.back()

Expand Down Expand Up @@ -1637,7 +1617,7 @@ cdef execute_dynamic_generator_and_store_task_outputs(
worker, generator,
caller_address,
dynamic_returns,
generator_id)
generator_id.Binary())
except Exception as error:
is_retryable_error[0] = determine_if_retryable(
should_retry_exceptions,
Expand Down Expand Up @@ -2964,7 +2944,7 @@ cdef class OldGcsClient:
@_auto_reconnect
def request_cluster_resource_constraint(
self,
bundles: c_vector[unordered_map[c_string, double]],
bundles: c_vector[unordered_map[c_string, cython.double]],
count_array: c_vector[int64_t],
timeout_s=None):
cdef:
Expand Down Expand Up @@ -4601,18 +4581,22 @@ cdef class CoreWorker:
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]]
*returns,
CObjectID ref_generator_id=CObjectID.Nil()):
ref_generator_id=None):
cdef:
CObjectID return_id
size_t data_size
shared_ptr[CBuffer] metadata
c_vector[CObjectID] contained_id
int64_t task_output_inlined_bytes
int64_t num_returns = -1
CObjectID c_ref_generator_id = CObjectID.Nil()
shared_ptr[CRayObject] *return_ptr

if ref_generator_id:
c_ref_generator_id = CObjectID.FromBinary(ref_generator_id)

num_outputs_stored = 0
if not ref_generator_id.IsNil():
if not c_ref_generator_id.IsNil():
# The task specified a dynamic number of return values. Determine
# the expected number of return values.
if returns[0].size() > 0:
Expand Down Expand Up @@ -4682,15 +4666,15 @@ cdef class CoreWorker:

if not self.store_task_output(
serialized_object, return_id,
ref_generator_id,
c_ref_generator_id,
data_size, metadata, contained_id, caller_address,
&task_output_inlined_bytes, return_ptr):
# If the object already exists, but we fail to pin the copy, it
# means the existing copy might've gotten evicted. Try to
# create another copy.
self.store_task_output(
serialized_object, return_id,
ref_generator_id,
c_ref_generator_id,
data_size, metadata,
contained_id, caller_address, &task_output_inlined_bytes,
return_ptr)
Expand Down
6 changes: 3 additions & 3 deletions python/ray/includes/gcs_client.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ cdef class NewGcsClient:
c_vector[c_string] results
CRayStatus status
for node_id in node_ids:
c_node_ids.push_back(CNodeID.FromBinary(node_id))
c_node_ids.push_back(<CNodeID>CUniqueID.FromBinary(node_id))
with nogil:
status = self.inner.get().Nodes().DrainNodes(
c_node_ids, timeout_ms, results)
Expand Down Expand Up @@ -504,7 +504,7 @@ cdef class NewGcsClient:
#############################################################
def request_cluster_resource_constraint(
self,
bundles: c_vector[unordered_map[c_string, double]],
bundles: c_vector[unordered_map[c_string, cython.double]],
count_array: c_vector[int64_t],
timeout_s=None):
cdef:
Expand Down Expand Up @@ -591,7 +591,7 @@ cdef incremented_fut():
cpython.Py_INCREF(fut)
return fut

cdef void assign_and_decrement_fut(result, fut) with gil:
cdef void assign_and_decrement_fut(result, fut) noexcept with gil:
assert isinstance(fut, concurrent.futures.Future)

assert not fut.done()
Expand Down
8 changes: 4 additions & 4 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ cdef class GlobalStateAccessor:

def get_worker_info(self, worker_id):
cdef unique_ptr[c_string] worker_info
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
with nogil:
worker_info = self.inner.get().GetWorkerInfo(cworker_id)
if worker_info:
Expand All @@ -200,14 +200,14 @@ cdef class GlobalStateAccessor:

def get_worker_debugger_port(self, worker_id):
cdef c_uint32_t result
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
with nogil:
result = self.inner.get().GetWorkerDebuggerPort(cworker_id)
return result

def update_worker_debugger_port(self, worker_id, debugger_port):
cdef c_bool result
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef c_uint32_t cdebugger_port = debugger_port
with nogil:
result = self.inner.get().UpdateWorkerDebuggerPort(
Expand All @@ -217,7 +217,7 @@ cdef class GlobalStateAccessor:

def update_worker_num_paused_threads(self, worker_id, num_paused_threads_delta):
cdef c_bool result
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef c_int32_t cnum_paused_threads_delta = num_paused_threads_delta

with nogil:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/object_ref.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ cdef class ObjectRef(BaseID):
pass

cdef CObjectID native(self):
return self.data
return <CObjectID>self.data

def binary(self):
return self.data.Binary()
Expand Down
55 changes: 28 additions & 27 deletions python/ray/includes/unique_ids.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ from libc.stdint cimport uint8_t, uint32_t, int64_t

cdef extern from "ray/common/id.h" namespace "ray" nogil:
cdef cppclass CBaseID[T]:
@staticmethod
T FromBinary(const c_string &binary)
# @staticmethod
# T FromBinary(const c_string &binary)

@staticmethod
T FromHex(const c_string &hex_str)
# @staticmethod
# T FromHex(const c_string &hex_str)

@staticmethod
const T Nil()
# @staticmethod
# const T Nil()

@staticmethod
size_t Size()
# @staticmethod
# size_t Size()

size_t Hash() const
c_bool IsNil() const
Expand All @@ -25,7 +25,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
c_string Binary() const
c_string Hex() const

cdef cppclass CUniqueID "ray::UniqueID"(CBaseID):
cdef cppclass CUniqueID "ray::UniqueID"(CBaseID[CUniqueID]):
CUniqueID()

@staticmethod
Expand All @@ -40,13 +40,13 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
const CUniqueID Nil()

@staticmethod
size_t Size()
# @staticmethod
# size_t Size()

cdef cppclass CActorClassID "ray::ActorClassID"(CUniqueID):
cdef cppclass CActorClassID "ray::ActorClassID"(CBaseID[CActorClassID]):

@staticmethod
CActorClassID FromBinary(const c_string &binary)
# @staticmethod
# CActorClassID FromBinary(const c_string &binary)

@staticmethod
CActorClassID FromHex(const c_string &hex_str)
Expand All @@ -71,26 +71,27 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:

CJobID JobId()

cdef cppclass CNodeID "ray::NodeID"(CUniqueID):
cdef cppclass CNodeID "ray::NodeID"(CBaseID[CNodeID]):

@staticmethod
CNodeID FromBinary(const c_string &binary)
# @staticmethod
# CNodeID FromBinary(const c_string &binary)

@staticmethod
CNodeID FromHex(const c_string &hex_str)

@staticmethod
const CNodeID Nil()

cdef cppclass CConfigID "ray::ConfigID"(CUniqueID):
cdef cppclass CConfigID "ray::ConfigID"(CBaseID[CConfigID]):
pass

@staticmethod
CConfigID FromBinary(const c_string &binary)

cdef cppclass CFunctionID "ray::FunctionID"(CUniqueID):
cdef cppclass CFunctionID "ray::FunctionID"(CBaseID[CFunctionID]):

@staticmethod
CFunctionID FromBinary(const c_string &binary)
# @staticmethod
# CFunctionID FromBinary(const c_string &binary)

@staticmethod
CFunctionID FromHex(const c_string &hex_str)
Expand Down Expand Up @@ -175,10 +176,10 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:

CTaskID TaskId() const

cdef cppclass CClusterID "ray::ClusterID"(CUniqueID):
cdef cppclass CClusterID "ray::ClusterID"(CBaseID[CClusterID]):

@staticmethod
CClusterID FromBinary(const c_string &binary)
# @staticmethod
# CClusterID FromBinary(const c_string &binary)

@staticmethod
CClusterID FromHex(const c_string &hex_str)
Expand All @@ -189,10 +190,10 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
const CClusterID Nil()

cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID):
cdef cppclass CWorkerID "ray::WorkerID"(CBaseID[CWorkerID]):

@staticmethod
CWorkerID FromBinary(const c_string &binary)
# @staticmethod
# CWorkerID FromBinary(const c_string &binary)

@staticmethod
CWorkerID FromHex(const c_string &hex_str)
Expand Down
10 changes: 5 additions & 5 deletions python/ray/includes/unique_ids.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ cdef class NodeID(UniqueID):

def __init__(self, id):
check_id(id)
self.data = CNodeID.FromBinary(<c_string>id)
self.data = CUniqueID.FromBinary(<c_string>id)

@classmethod
def from_hex(cls, hex_id):
Expand Down Expand Up @@ -276,7 +276,7 @@ cdef class WorkerID(UniqueID):

def __init__(self, id):
check_id(id)
self.data = CWorkerID.FromBinary(<c_string>id)
self.data = CUniqueID.FromBinary(<c_string>id)

@classmethod
def from_hex(cls, hex_id):
Expand Down Expand Up @@ -344,7 +344,7 @@ cdef class FunctionID(UniqueID):

def __init__(self, id):
check_id(id)
self.data = CFunctionID.FromBinary(<c_string>id)
self.data = CUniqueID.FromBinary(<c_string>id)

@classmethod
def from_hex(cls, hex_id):
Expand All @@ -359,7 +359,7 @@ cdef class ActorClassID(UniqueID):

def __init__(self, id):
check_id(id)
self.data = CActorClassID.FromBinary(<c_string>id)
self.data = CUniqueID.FromBinary(<c_string>id)

@classmethod
def from_hex(cls, hex_id):
Expand All @@ -373,7 +373,7 @@ cdef class ClusterID(UniqueID):

def __init__(self, id):
check_id(id)
self.data = CClusterID.FromBinary(<c_string>id)
self.data = CUniqueID.FromBinary(<c_string>id)

@classmethod
def from_hex(cls, hex_id):
Expand Down
6 changes: 4 additions & 2 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

logger = logging.getLogger(__name__)

SUPPORTED_PYTHONS = [(3, 9), (3, 10), (3, 11), (3, 12)]
SUPPORTED_PYTHONS = [(3, 9), (3, 10), (3, 11), (3, 12), (3, 13)]
# When the bazel version is updated, make sure to update it
# in WORKSPACE file as well.

Expand Down Expand Up @@ -528,6 +528,7 @@ def build(build_python, build_java, build_cpp):
# version of Python to build packages inside the build.sh script. Note
# that certain flags will not be passed along such as --user or sudo.
# TODO(rkn): Fix this.
"""
if not os.getenv("SKIP_THIRDPARTY_INSTALL"):
pip_packages = ["psutil", "setproctitle==1.2.2", "colorama"]
subprocess.check_call(
Expand Down Expand Up @@ -556,6 +557,7 @@ def build(build_python, build_java, build_cpp):
]
+ runtime_env_agent_pip_packages
)
"""

bazel_flags = ["--verbose_failures"]
if BAZEL_ARGS:
Expand Down Expand Up @@ -798,7 +800,7 @@ def has_ext_modules(self):
# The BinaryDistribution argument triggers build_ext.
distclass=BinaryDistribution,
install_requires=setup_spec.install_requires,
setup_requires=["cython >= 0.29.32", "wheel"],
setup_requires=["cython >= 3.0.11", "wheel"],
extras_require=setup_spec.extras,
entry_points={
"console_scripts": [
Expand Down