diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 722ff8b846c8..8f18250d22de 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -13,6 +13,7 @@ import ray.ray_constants as ray_constants from ray.utils import binary_to_hex, setup_logger from ray.autoscaler.commands import teardown_cluster +import redis logger = logging.getLogger(__name__) @@ -161,7 +162,11 @@ def process_messages(self, max_messages=10000): subscribe_clients = [self.primary_subscribe_client] for subscribe_client in subscribe_clients: for _ in range(max_messages): - message = subscribe_client.get_message() + message = None + try: + message = subscribe_client.get_message() + except redis.exceptions.ConnectionError: + pass if message is None: # Continue on to the next subscribe client. break diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9cba80710d50..74690e67d66f 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -152,6 +152,11 @@ void CoreWorkerProcess::EnsureInitialized() { CoreWorker &CoreWorkerProcess::GetCoreWorker() { EnsureInitialized(); if (instance_->options_.num_workers == 1) { + // TODO(mehrdadn): Remove this when the bug is resolved. + // Somewhat consistently reproducible via + // python/ray/tests/test_basic.py::test_background_tasks_with_max_calls + // with -c opt on Windows. + RAY_CHECK(instance_->global_worker_) << "global_worker_ must not be NULL"; return *instance_->global_worker_; } auto ptr = current_core_worker_.lock(); diff --git a/src/ray/gcs/asio.cc b/src/ray/gcs/asio.cc index f22165fd942e..8e71fb83869c 100644 --- a/src/ray/gcs/asio.cc +++ b/src/ray/gcs/asio.cc @@ -79,7 +79,8 @@ void RedisAsioClient::operate() { void RedisAsioClient::handle_read(boost::system::error_code error_code) { RAY_CHECK(!error_code || error_code == boost::asio::error::would_block || - error_code == boost::asio::error::connection_reset); + error_code == boost::asio::error::connection_reset) + << "handle_read(error_code = " << error_code << ")"; read_in_progress_ = false; redis_async_context_.RedisAsyncHandleRead(); @@ -90,7 +91,8 @@ void RedisAsioClient::handle_read(boost::system::error_code error_code) { void RedisAsioClient::handle_write(boost::system::error_code error_code) { RAY_CHECK(!error_code || error_code == boost::asio::error::would_block || - error_code == boost::asio::error::connection_reset); + error_code == boost::asio::error::connection_reset) + << "handle_write(error_code = " << error_code << ")"; write_in_progress_ = false; redis_async_context_.RedisAsyncHandleWrite(); diff --git a/src/ray/gcs/redis_async_context.cc b/src/ray/gcs/redis_async_context.cc index 5af88cc6db69..33f388684222 100644 --- a/src/ray/gcs/redis_async_context.cc +++ b/src/ray/gcs/redis_async_context.cc @@ -50,7 +50,11 @@ void RedisAsyncContext::RedisAsyncHandleRead() { // This function will execute the callbacks which are registered by // `redisvAsyncCommand`, `redisAsyncCommandArgv` and so on. std::lock_guard lock(mutex_); - + // TODO(mehrdadn): Remove this when the bug is resolved. + // Somewhat consistently reproducible via + // python/ray/tests/test_basic.py::test_background_tasks_with_max_calls + // with -c opt on Windows. + RAY_CHECK(redis_async_context_) << "redis_async_context_ must not be NULL here"; redisAsyncHandleRead(redis_async_context_); }