Skip to content

Commit

Permalink
[CELEBORN-1774] Update default value of celeborn.<module>.io.mode to …
Browse files Browse the repository at this point in the history
…whether epoll mode is available
  • Loading branch information
SteNicholas committed Dec 17, 2024
1 parent 72eb9c0 commit b4e9b70
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.socket.SocketChannel;
import org.apache.commons.lang3.SystemUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -101,7 +100,6 @@ private void init(String hostToBind, int portToBind) {
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.option(EpollChannelOption.SO_REUSEPORT, IOMode.EPOLL.equals(ioMode))
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, allocator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,7 @@ object CelebornConf extends Logging {
val NETWORK_IO_MODE: ConfigEntry[String] =
buildConf("celeborn.<module>.io.mode")
.categories("network")
.doc("Netty EventLoopGroup backend, available options: NIO, EPOLL. The default IO mode is EPOLL for the available epoll mode.")
.doc("Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO.")
.stringConf
.transform(_.toUpperCase)
.checkValues(Set(IOMode.NIO.name(), IOMode.EPOLL.name()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,7 @@ object Utils extends Logging {
}
isBindCollision(e.getCause)
case e: NativeIoException =>
(e.getMessage != null && e.getMessage.startsWith("bind() failed: ")) ||
isBindCollision(e.getCause)
e.getMessage != null && e.getMessage.startsWith("bind")
case e: Exception => isBindCollision(e.getCause)
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._

import io.netty.channel.epoll.Epoll
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.{mock, never, verify}
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -669,15 +668,9 @@ abstract class RpcEnvSuite extends CelebornFunSuite {
test("port conflict") {
val anotherEnv = createRpcEnv(createCelebornConf(), "remote", env.address.port)
try {
if (Epoll.isAvailable) {
assert(
anotherEnv.address.port == env.address.port,
s"new port = ${anotherEnv.address.port}, env port = ${env.address.port}")
} else {
assert(
anotherEnv.address.port != env.address.port,
s"new port = ${anotherEnv.address.port}, env port = ${env.address.port}")
}
assert(
anotherEnv.address.port != env.address.port,
s"new port = ${anotherEnv.address.port}, env port = ${env.address.port}")
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ license: |
| celeborn.&lt;module&gt;.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | |
| celeborn.&lt;module&gt;.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting <module> to `fetch`, it works for worker fetch server. | | |
| celeborn.&lt;module&gt;.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting <module> to `push`, it works for Flink shuffle client push data. | | |
| celeborn.&lt;module&gt;.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. The default IO mode is EPOLL for the available epoll mode. | | |
| celeborn.&lt;module&gt;.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | |
| celeborn.&lt;module&gt;.io.numConnectionsPerPeer | 1 | false | Number of concurrent connections between two nodes. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. | | |
| celeborn.&lt;module&gt;.io.preferDirectBufs | true | false | If true, we will prefer allocating off-heap byte buffers within Netty. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | | |
| celeborn.&lt;module&gt;.io.receiveBuffer | 0b | false | Receive buffer size (SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 10Gbps buffer size should be ~ 1.25MB. If setting <module> to `rpc_app`, works for shuffle client. If setting <module> to `rpc_service`, works for master or worker. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `push`, it works for worker receiving push data. If setting <module> to `replicate`, it works for replicate server or client of worker replicating data to peer worker. If setting <module> to `fetch`, it works for worker fetch server. | 0.2.0 | |
Expand Down
2 changes: 1 addition & 1 deletion docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ license: |

- Since 0.6.0, Celeborn changed the default value of `celeborn.client.spark.fetch.throwsFetchFailure` from `false` to `true`, which means Celeborn will enable spark stage rerun at default.

- Since 0.6.0, Celeborn changed the default value of `celeborn.<module>.io.mode` from `NIO` to whether epoll mode is available, which means Celeborn will regard `EPOLL` as the default IO mode for the available epoll mode.
- Since 0.6.0, Celeborn changed the default value of `celeborn.<module>.io.mode` from `NIO` to `EPOLL` if epoll mode is available, falling back to `NIO` otherwise.

- Since 0.6.0, Celeborn has introduced a new RESTful API namespace: /api/v1, which uses the application/json media type for requests and responses.
The `celeborn-openapi-client` SDK is also available to help users interact with the new RESTful APIs.
Expand Down

0 comments on commit b4e9b70

Please sign in to comment.