Skip to content

Commit

Permalink
Revert "[Core] Initial port of Ray to Python 3.13 (ray-project#47984)"
Browse files Browse the repository at this point in the history
This reverts commit d2004b6.

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
  • Loading branch information
edoakes committed Feb 19, 2025
1 parent 3aa9634 commit 94cefe2
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 55 deletions.
4 changes: 2 additions & 2 deletions bazel/ray_deps_setup.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ def ray_deps_setup():
auto_http_archive(
name = "cython",
build_file = True,
url = "https://github.com/cython/cython/archive/refs/tags/3.0.12.tar.gz",
sha256 = "a156fff948c2013f2c8c398612c018e2b52314fdf0228af8fbdb5585e13699c2",
url = "https://github.com/cython/cython/archive/refs/tags/0.29.37.tar.gz",
sha256 = "824eb14045d85c5af677536134199dd6709db8fb0835452fd2d54bc3c8df8887",
)

auto_http_archive(
Expand Down
3 changes: 1 addition & 2 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ cdef class CoreWorker:
worker, outputs,
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]] *returns,
ref_generator_id=*, # CObjectID
)
CObjectID ref_generator_id=*)
cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle,
c_bool weak_ref)
cdef c_function_descriptors_to_python(
Expand Down
20 changes: 8 additions & 12 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class ObjectRefGenerator:

def _next_sync(
self,
timeout_s: Optional[int | float] = None
timeout_s: Optional[float] = None
) -> ObjectRef:
"""Waits for timeout_s and returns the object ref if available.

Expand Down Expand Up @@ -509,7 +509,7 @@ class ObjectRefGenerator:

async def _next_async(
self,
timeout_s: Optional[int | float] = None
timeout_s: Optional[float] = None
):
"""Same API as _next_sync, but it is for async context."""
core_worker = self.worker.core_worker
Expand Down Expand Up @@ -1529,7 +1529,7 @@ cdef create_generator_return_obj(
worker, [output],
caller_address,
&intermediate_result,
generator_id.Binary())
generator_id)

return_object[0] = intermediate_result.back()

Expand Down Expand Up @@ -1657,7 +1657,7 @@ cdef execute_dynamic_generator_and_store_task_outputs(
worker, generator,
caller_address,
dynamic_returns,
generator_id.Binary())
generator_id)
except Exception as error:
is_retryable_error[0] = determine_if_retryable(
should_retry_exceptions,
Expand Down Expand Up @@ -4322,22 +4322,18 @@ cdef class CoreWorker:
const CAddress &caller_address,
c_vector[c_pair[CObjectID, shared_ptr[CRayObject]]]
*returns,
ref_generator_id=None):
CObjectID ref_generator_id=CObjectID.Nil()):
cdef:
CObjectID return_id
size_t data_size
shared_ptr[CBuffer] metadata
c_vector[CObjectID] contained_id
int64_t task_output_inlined_bytes
int64_t num_returns = -1
CObjectID c_ref_generator_id = CObjectID.Nil()
shared_ptr[CRayObject] *return_ptr

if ref_generator_id:
c_ref_generator_id = CObjectID.FromBinary(ref_generator_id)

num_outputs_stored = 0
if not c_ref_generator_id.IsNil():
if not ref_generator_id.IsNil():
# The task specified a dynamic number of return values. Determine
# the expected number of return values.
if returns[0].size() > 0:
Expand Down Expand Up @@ -4407,15 +4403,15 @@ cdef class CoreWorker:

if not self.store_task_output(
serialized_object, return_id,
c_ref_generator_id,
ref_generator_id,
data_size, metadata, contained_id, caller_address,
&task_output_inlined_bytes, return_ptr):
# If the object already exists, but we fail to pin the copy, it
# means the existing copy might've gotten evicted. Try to
# create another copy.
self.store_task_output(
serialized_object, return_id,
c_ref_generator_id,
ref_generator_id,
data_size, metadata,
contained_id, caller_address, &task_output_inlined_bytes,
return_ptr)
Expand Down
28 changes: 14 additions & 14 deletions python/ray/includes/gcs_client.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Binding of C++ ray::gcs::GcsClient.
#
# For how async API are implemented, see src/ray/gcs/gcs_client/python_callbacks.h
from asyncio import Future
from typing import List, Sequence
from typing import List
from libcpp.utility cimport move
import concurrent.futures
from ray.includes.common cimport (
Expand Down Expand Up @@ -277,7 +277,7 @@ cdef class InnerGcsClient:
# NodeInfo methods
#############################################################
def check_alive(
self, node_ips: List[bytes], timeout: Optional[int | float] = None
self, node_ips: List[bytes], timeout: Optional[float] = None
) -> List[bool]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand All @@ -290,7 +290,7 @@ cdef class InnerGcsClient:
return raise_or_return(convert_multi_bool(status, move(results)))

def async_check_alive(
self, node_ips: List[bytes], timeout: Optional[int | float] = None
self, node_ips: List[bytes], timeout: Optional[float] = None
) -> Future[List[bool]]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand All @@ -307,7 +307,7 @@ cdef class InnerGcsClient:
return asyncio.wrap_future(fut)

def drain_nodes(
self, node_ids: Sequence[bytes], timeout: Optional[int | float] = None
self, node_ids: List[bytes], timeout: Optional[float] = None
) -> List[bytes]:
"""returns a list of node_ids that are successfully drained."""
cdef:
Expand All @@ -316,14 +316,14 @@ cdef class InnerGcsClient:
c_vector[c_string] results
CRayStatus status
for node_id in node_ids:
c_node_ids.push_back(<CNodeID>CUniqueID.FromBinary(node_id))
c_node_ids.push_back(CNodeID.FromBinary(node_id))
with nogil:
status = self.inner.get().Nodes().DrainNodes(
c_node_ids, timeout_ms, results)
return raise_or_return(convert_multi_str(status, move(results)))

def get_all_node_info(
self, timeout: Optional[int | float] = None
self, timeout: Optional[float] = None
) -> Dict[NodeID, gcs_pb2.GcsNodeInfo]:
cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1
cdef c_vector[CGcsNodeInfo] reply
Expand All @@ -333,7 +333,7 @@ cdef class InnerGcsClient:
return raise_or_return(convert_get_all_node_info(status, move(reply)))

def async_get_all_node_info(
self, node_id: Optional[NodeID] = None, timeout: Optional[int | float] = None
self, node_id: Optional[NodeID] = None, timeout: Optional[float] = None
) -> Future[Dict[NodeID, gcs_pb2.GcsNodeInfo]]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand All @@ -356,7 +356,7 @@ cdef class InnerGcsClient:
# NodeResources methods
#############################################################
def get_all_resource_usage(
self, timeout: Optional[int | float] = None
self, timeout: Optional[float] = None
) -> GetAllResourceUsageReply:
cdef int64_t timeout_ms = round(1000 * timeout) if timeout else -1
cdef CGetAllResourceUsageReply c_reply
Expand All @@ -381,7 +381,7 @@ cdef class InnerGcsClient:
actor_id: Optional[ActorID] = None,
job_id: Optional[JobID] = None,
actor_state_name: Optional[str] = None,
timeout: Optional[int | float] = None
timeout: Optional[float] = None
) -> Future[Dict[ActorID, gcs_pb2.ActorTableData]]:
cdef:
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
Expand Down Expand Up @@ -409,7 +409,7 @@ cdef class InnerGcsClient:

def async_kill_actor(
self, actor_id: ActorID, c_bool force_kill, c_bool no_restart,
timeout: Optional[int | float] = None
timeout: Optional[float] = None
) -> ConcurrentFuture[None]:
"""
On success: returns None.
Expand Down Expand Up @@ -439,7 +439,7 @@ cdef class InnerGcsClient:
self, *, job_or_submission_id: Optional[str] = None,
skip_submission_job_info_field: bool = False,
skip_is_running_tasks_field: bool = False,
timeout: Optional[int | float] = None
timeout: Optional[float] = None
) -> Dict[JobID, gcs_pb2.JobTableData]:
cdef c_string c_job_or_submission_id
cdef optional[c_string] c_optional_job_or_submission_id = nullopt
Expand All @@ -462,7 +462,7 @@ cdef class InnerGcsClient:
self, *, job_or_submission_id: Optional[str] = None,
skip_submission_job_info_field: bool = False,
skip_is_running_tasks_field: bool = False,
timeout: Optional[int | float] = None
timeout: Optional[float] = None
) -> Future[Dict[JobID, gcs_pb2.JobTableData]]:
cdef:
c_string c_job_or_submission_id
Expand Down Expand Up @@ -507,7 +507,7 @@ cdef class InnerGcsClient:
#############################################################
def request_cluster_resource_constraint(
self,
bundles: c_vector[unordered_map[c_string, cython.double]],
bundles: c_vector[unordered_map[c_string, double]],
count_array: c_vector[int64_t],
timeout_s=None):
cdef:
Expand Down Expand Up @@ -629,7 +629,7 @@ cdef incremented_fut():
cpython.Py_INCREF(fut)
return fut

cdef void assign_and_decrement_fut(result, fut) noexcept with gil:
cdef void assign_and_decrement_fut(result, fut) with gil:
assert isinstance(fut, concurrent.futures.Future)

assert not fut.done()
Expand Down
8 changes: 4 additions & 4 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ cdef class GlobalStateAccessor:

def get_worker_info(self, worker_id):
cdef unique_ptr[c_string] worker_info
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
with nogil:
worker_info = self.inner.get().GetWorkerInfo(cworker_id)
if worker_info:
Expand All @@ -211,14 +211,14 @@ cdef class GlobalStateAccessor:

def get_worker_debugger_port(self, worker_id):
cdef c_uint32_t result
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
with nogil:
result = self.inner.get().GetWorkerDebuggerPort(cworker_id)
return result

def update_worker_debugger_port(self, worker_id, debugger_port):
cdef c_bool result
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef c_uint32_t cdebugger_port = debugger_port
with nogil:
result = self.inner.get().UpdateWorkerDebuggerPort(
Expand All @@ -228,7 +228,7 @@ cdef class GlobalStateAccessor:

def update_worker_num_paused_threads(self, worker_id, num_paused_threads_delta):
cdef c_bool result
cdef CWorkerID cworker_id = <CWorkerID>CUniqueID.FromBinary(worker_id.binary())
cdef CWorkerID cworker_id = CWorkerID.FromBinary(worker_id.binary())
cdef c_int32_t cnum_paused_threads_delta = num_paused_threads_delta

with nogil:
Expand Down
2 changes: 1 addition & 1 deletion python/ray/includes/object_ref.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ cdef class ObjectRef(BaseID):
pass

cdef CObjectID native(self):
return <CObjectID>self.data
return self.data

def binary(self):
return self.data.Binary()
Expand Down
53 changes: 40 additions & 13 deletions python/ray/includes/unique_ids.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ from libcpp cimport bool as c_bool
from libcpp.string cimport string as c_string
from libc.stdint cimport uint8_t, uint32_t, int64_t

# Note: we removed the staticmethod declarations in
# https://github.com/ray-project/ray/pull/47984 due
# to a compiler bug in Cython 3.0.x -- we should see
# if we can bring them back in Cython 3.1.x if the
# bug is fixed.
cdef extern from "ray/common/id.h" namespace "ray" nogil:
cdef cppclass CBaseID[T]:
@staticmethod
T FromBinary(const c_string &binary)

@staticmethod
T FromHex(const c_string &hex_str)

@staticmethod
const T Nil()

@staticmethod
size_t Size()

size_t Hash() const
c_bool IsNil() const
c_bool operator==(const CBaseID &rhs) const
Expand All @@ -18,7 +25,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
c_string Binary() const
c_string Hex() const

cdef cppclass CUniqueID "ray::UniqueID"(CBaseID[CUniqueID]):
cdef cppclass CUniqueID "ray::UniqueID"(CBaseID):
CUniqueID()

@staticmethod
Expand All @@ -33,7 +40,13 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
const CUniqueID Nil()

cdef cppclass CActorClassID "ray::ActorClassID"(CBaseID[CActorClassID]):
@staticmethod
size_t Size()

cdef cppclass CActorClassID "ray::ActorClassID"(CUniqueID):

@staticmethod
CActorClassID FromBinary(const c_string &binary)

@staticmethod
CActorClassID FromHex(const c_string &hex_str)
Expand All @@ -58,18 +71,26 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:

CJobID JobId()

cdef cppclass CNodeID "ray::NodeID"(CBaseID[CNodeID]):
cdef cppclass CNodeID "ray::NodeID"(CUniqueID):

@staticmethod
CNodeID FromBinary(const c_string &binary)

@staticmethod
CNodeID FromHex(const c_string &hex_str)

@staticmethod
const CNodeID Nil()

cdef cppclass CConfigID "ray::ConfigID"(CBaseID[CConfigID]):
pass
cdef cppclass CConfigID "ray::ConfigID"(CUniqueID):

@staticmethod
CConfigID FromBinary(const c_string &binary)

cdef cppclass CFunctionID "ray::FunctionID"(CUniqueID):

cdef cppclass CFunctionID "ray::FunctionID"(CBaseID[CFunctionID]):
@staticmethod
CFunctionID FromBinary(const c_string &binary)

@staticmethod
CFunctionID FromHex(const c_string &hex_str)
Expand Down Expand Up @@ -154,7 +175,10 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:

CTaskID TaskId() const

cdef cppclass CClusterID "ray::ClusterID"(CBaseID[CClusterID]):
cdef cppclass CClusterID "ray::ClusterID"(CUniqueID):

@staticmethod
CClusterID FromBinary(const c_string &binary)

@staticmethod
CClusterID FromHex(const c_string &hex_str)
Expand All @@ -165,7 +189,10 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
const CClusterID Nil()

cdef cppclass CWorkerID "ray::WorkerID"(CBaseID[CWorkerID]):
cdef cppclass CWorkerID "ray::WorkerID"(CUniqueID):

@staticmethod
CWorkerID FromBinary(const c_string &binary)

@staticmethod
CWorkerID FromHex(const c_string &hex_str)
Expand Down
Loading

0 comments on commit 94cefe2

Please sign in to comment.