From ba28a8fcbc3ba432e7ea4d6f0b535450a6ec96c6 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 10 Aug 2014 20:36:54 -0700 Subject: [PATCH] [SPARK-2936] Migrate Netty network module from Java to Scala The Netty network module was originally written when Scala 2.9.x had a bug that prevents a pure Scala implementation, and a subset of the files were done in Java. We have since upgraded to Scala 2.10, and can migrate all Java files now to Scala. https://github.com/netty/netty/issues/781 https://github.com/mesos/spark/pull/522 Author: Reynold Xin Closes #1865 from rxin/netty and squashes the following commits: 332422f [Reynold Xin] Code review feedback ca9eeee [Reynold Xin] Minor update. 7f1434b [Reynold Xin] [SPARK-2936] Migrate Netty network module from Java to Scala --- .../spark/network/netty/FileClient.java | 100 ---------------- .../spark/network/netty/FileServer.java | 111 ------------------ .../network/netty/FileServerHandler.java | 83 ------------- .../spark/network/netty/FileClient.scala | 85 ++++++++++++++ .../netty/FileClientChannelInitializer.scala} | 24 ++-- .../network/netty/FileClientHandler.scala} | 47 ++++---- .../spark/network/netty/FileHeader.scala | 5 +- .../spark/network/netty/FileServer.scala | 91 ++++++++++++++ .../netty/FileServerChannelInitializer.scala} | 31 ++--- .../network/netty/FileServerHandler.scala | 68 +++++++++++ .../spark/network/netty/PathResolver.scala} | 9 +- .../spark/network/netty/ShuffleSender.scala | 2 +- 12 files changed, 292 insertions(+), 364 deletions(-) delete mode 100644 core/src/main/java/org/apache/spark/network/netty/FileClient.java delete mode 100644 core/src/main/java/org/apache/spark/network/netty/FileServer.java delete mode 100644 core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java create mode 100644 core/src/main/scala/org/apache/spark/network/netty/FileClient.scala rename core/src/main/{java/org/apache/spark/network/netty/FileClientChannelInitializer.java => scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala} (57%) rename core/src/main/{java/org/apache/spark/network/netty/FileClientHandler.java => scala/org/apache/spark/network/netty/FileClientHandler.scala} (51%) create mode 100644 core/src/main/scala/org/apache/spark/network/netty/FileServer.scala rename core/src/main/{java/org/apache/spark/network/netty/FileServerChannelInitializer.java => scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala} (54%) create mode 100644 core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala rename core/src/main/{java/org/apache/spark/network/netty/PathResolver.java => scala/org/apache/spark/network/netty/PathResolver.scala} (80%) mode change 100755 => 100644 diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java deleted file mode 100644 index 0d31894d6ec7a..0000000000000 --- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.netty; - -import java.util.concurrent.TimeUnit; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.oio.OioEventLoopGroup; -import io.netty.channel.socket.oio.OioSocketChannel; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class FileClient { - - private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName()); - - private final FileClientHandler handler; - private Channel channel = null; - private Bootstrap bootstrap = null; - private EventLoopGroup group = null; - private final int connectTimeout; - private final int sendTimeout = 60; // 1 min - - FileClient(FileClientHandler handler, int connectTimeout) { - this.handler = handler; - this.connectTimeout = connectTimeout; - } - - public void init() { - group = new OioEventLoopGroup(); - bootstrap = new Bootstrap(); - bootstrap.group(group) - .channel(OioSocketChannel.class) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.TCP_NODELAY, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout) - .handler(new FileClientChannelInitializer(handler)); - } - - public void connect(String host, int port) { - try { - // Start the connection attempt. - channel = bootstrap.connect(host, port).sync().channel(); - // ChannelFuture cf = channel.closeFuture(); - //cf.addListener(new ChannelCloseListener(this)); - } catch (InterruptedException e) { - LOG.warn("FileClient interrupted while trying to connect", e); - close(); - } - } - - public void waitForClose() { - try { - channel.closeFuture().sync(); - } catch (InterruptedException e) { - LOG.warn("FileClient interrupted", e); - } - } - - public void sendRequest(String file) { - //assert(file == null); - //assert(channel == null); - try { - // Should be able to send the message to network link channel. - boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS); - if (!bSent) { - throw new RuntimeException("Failed to send"); - } - } catch (InterruptedException e) { - LOG.error("Error", e); - } - } - - public void close() { - if (group != null) { - group.shutdownGracefully(); - group = null; - bootstrap = null; - } - } -} diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServer.java b/core/src/main/java/org/apache/spark/network/netty/FileServer.java deleted file mode 100644 index c93425e2787dc..0000000000000 --- a/core/src/main/java/org/apache/spark/network/netty/FileServer.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.netty; - -import java.net.InetSocketAddress; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.oio.OioEventLoopGroup; -import io.netty.channel.socket.oio.OioServerSocketChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Server that accept the path of a file an echo back its content. - */ -class FileServer { - - private static final Logger LOG = LoggerFactory.getLogger(FileServer.class.getName()); - - private EventLoopGroup bossGroup = null; - private EventLoopGroup workerGroup = null; - private ChannelFuture channelFuture = null; - private int port = 0; - - FileServer(PathResolver pResolver, int port) { - InetSocketAddress addr = new InetSocketAddress(port); - - // Configure the server. - bossGroup = new OioEventLoopGroup(); - workerGroup = new OioEventLoopGroup(); - - ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.group(bossGroup, workerGroup) - .channel(OioServerSocketChannel.class) - .option(ChannelOption.SO_BACKLOG, 100) - .option(ChannelOption.SO_RCVBUF, 1500) - .childHandler(new FileServerChannelInitializer(pResolver)); - // Start the server. - channelFuture = bootstrap.bind(addr); - try { - // Get the address we bound to. - InetSocketAddress boundAddress = - ((InetSocketAddress) channelFuture.sync().channel().localAddress()); - this.port = boundAddress.getPort(); - } catch (InterruptedException ie) { - this.port = 0; - } - } - - /** - * Start the file server asynchronously in a new thread. - */ - public void start() { - Thread blockingThread = new Thread() { - @Override - public void run() { - try { - channelFuture.channel().closeFuture().sync(); - LOG.info("FileServer exiting"); - } catch (InterruptedException e) { - LOG.error("File server start got interrupted", e); - } - // NOTE: bootstrap is shutdown in stop() - } - }; - blockingThread.setDaemon(true); - blockingThread.start(); - } - - public int getPort() { - return port; - } - - public void stop() { - // Close the bound channel. - if (channelFuture != null) { - channelFuture.channel().close().awaitUninterruptibly(); - channelFuture = null; - } - - // Shutdown event groups - if (bossGroup != null) { - bossGroup.shutdownGracefully(); - bossGroup = null; - } - - if (workerGroup != null) { - workerGroup.shutdownGracefully(); - workerGroup = null; - } - // TODO: Shutdown all accepted channels as well ? - } -} diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java b/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java deleted file mode 100644 index c0133e19c7f79..0000000000000 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.netty; - -import java.io.File; -import java.io.FileInputStream; - -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.DefaultFileRegion; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.spark.storage.BlockId; -import org.apache.spark.storage.FileSegment; - -class FileServerHandler extends SimpleChannelInboundHandler { - - private static final Logger LOG = LoggerFactory.getLogger(FileServerHandler.class.getName()); - - private final PathResolver pResolver; - - FileServerHandler(PathResolver pResolver){ - this.pResolver = pResolver; - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, String blockIdString) { - BlockId blockId = BlockId.apply(blockIdString); - FileSegment fileSegment = pResolver.getBlockLocation(blockId); - // if getBlockLocation returns null, close the channel - if (fileSegment == null) { - //ctx.close(); - return; - } - File file = fileSegment.file(); - if (file.exists()) { - if (!file.isFile()) { - ctx.write(new FileHeader(0, blockId).buffer()); - ctx.flush(); - return; - } - long length = fileSegment.length(); - if (length > Integer.MAX_VALUE || length <= 0) { - ctx.write(new FileHeader(0, blockId).buffer()); - ctx.flush(); - return; - } - int len = (int) length; - ctx.write((new FileHeader(len, blockId)).buffer()); - try { - ctx.write(new DefaultFileRegion(new FileInputStream(file) - .getChannel(), fileSegment.offset(), fileSegment.length())); - } catch (Exception e) { - LOG.error("Exception: ", e); - } - } else { - ctx.write(new FileHeader(0, blockId).buffer()); - } - ctx.flush(); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - LOG.error("Exception: ", cause); - ctx.close(); - } -} diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala b/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala new file mode 100644 index 0000000000000..c6d35f73db545 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/network/netty/FileClient.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.netty + +import java.util.concurrent.TimeUnit + +import io.netty.bootstrap.Bootstrap +import io.netty.channel.{Channel, ChannelOption, EventLoopGroup} +import io.netty.channel.oio.OioEventLoopGroup +import io.netty.channel.socket.oio.OioSocketChannel + +import org.apache.spark.Logging + +class FileClient(handler: FileClientHandler, connectTimeout: Int) extends Logging { + + private var channel: Channel = _ + private var bootstrap: Bootstrap = _ + private var group: EventLoopGroup = _ + private val sendTimeout = 60 + + def init(): Unit = { + group = new OioEventLoopGroup + bootstrap = new Bootstrap + bootstrap.group(group) + .channel(classOf[OioSocketChannel]) + .option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE) + .option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(connectTimeout)) + .handler(new FileClientChannelInitializer(handler)) + } + + def connect(host: String, port: Int) { + try { + channel = bootstrap.connect(host, port).sync().channel() + } catch { + case e: InterruptedException => + logWarning("FileClient interrupted while trying to connect", e) + close() + } + } + + def waitForClose(): Unit = { + try { + channel.closeFuture.sync() + } catch { + case e: InterruptedException => + logWarning("FileClient interrupted", e) + } + } + + def sendRequest(file: String): Unit = { + try { + val bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS) + if (!bSent) { + throw new RuntimeException("Failed to send") + } + } catch { + case e: InterruptedException => + logError("Error", e) + } + } + + def close(): Unit = { + if (group != null) { + group.shutdownGracefully() + group = null + bootstrap = null + } + } +} diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java b/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala similarity index 57% rename from core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java rename to core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala index 264cf97d0209f..f4261c13f70a8 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java +++ b/core/src/main/scala/org/apache/spark/network/netty/FileClientChannelInitializer.scala @@ -15,25 +15,17 @@ * limitations under the License. */ -package org.apache.spark.network.netty; +package org.apache.spark.network.netty -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.string.StringEncoder; +import io.netty.channel.ChannelInitializer +import io.netty.channel.socket.SocketChannel +import io.netty.handler.codec.string.StringEncoder -class FileClientChannelInitializer extends ChannelInitializer { - private final FileClientHandler fhandler; +class FileClientChannelInitializer(handler: FileClientHandler) + extends ChannelInitializer[SocketChannel] { - FileClientChannelInitializer(FileClientHandler handler) { - fhandler = handler; - } - - @Override - public void initChannel(SocketChannel channel) { - // file no more than 2G - channel.pipeline() - .addLast("encoder", new StringEncoder()) - .addLast("handler", fhandler); + def initChannel(channel: SocketChannel) { + channel.pipeline.addLast("encoder", new StringEncoder).addLast("handler", handler) } } diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java b/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala similarity index 51% rename from core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java rename to core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala index 63d3d927255f9..017302ec7d33d 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java +++ b/core/src/main/scala/org/apache/spark/network/netty/FileClientHandler.scala @@ -15,41 +15,36 @@ * limitations under the License. */ -package org.apache.spark.network.netty; +package org.apache.spark.network.netty -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.buffer.ByteBuf +import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler} -import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.BlockId -abstract class FileClientHandler extends SimpleChannelInboundHandler { - private FileHeader currentHeader = null; +abstract class FileClientHandler extends SimpleChannelInboundHandler[ByteBuf] { - private volatile boolean handlerCalled = false; + private var currentHeader: FileHeader = null - public boolean isComplete() { - return handlerCalled; - } + @volatile + private var handlerCalled: Boolean = false + + def isComplete: Boolean = handlerCalled + + def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) - public abstract void handle(ChannelHandlerContext ctx, ByteBuf in, FileHeader header); - public abstract void handleError(BlockId blockId); + def handleError(blockId: BlockId) - @Override - public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { - // get header - if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) { - currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE())); + override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) { + if (currentHeader == null && in.readableBytes >= FileHeader.HEADER_SIZE) { + currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE)) } - // get file - if(in.readableBytes() >= currentHeader.fileLen()) { - handle(ctx, in, currentHeader); - handlerCalled = true; - currentHeader = null; - ctx.close(); + if (in.readableBytes >= currentHeader.fileLen) { + handle(ctx, in, currentHeader) + handlerCalled = true + currentHeader = null + ctx.close() } } - } - diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala index 136c1912045aa..607e560ff277f 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/FileHeader.scala @@ -26,7 +26,7 @@ private[spark] class FileHeader ( val fileLen: Int, val blockId: BlockId) extends Logging { - lazy val buffer = { + lazy val buffer: ByteBuf = { val buf = Unpooled.buffer() buf.capacity(FileHeader.HEADER_SIZE) buf.writeInt(fileLen) @@ -62,11 +62,10 @@ private[spark] object FileHeader { new FileHeader(length, blockId) } - def main (args:Array[String]) { + def main(args:Array[String]) { val header = new FileHeader(25, TestBlockId("my_block")) val buf = header.buffer val newHeader = FileHeader.create(buf) System.out.println("id=" + newHeader.blockId + ",size=" + newHeader.fileLen) } } - diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala b/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala new file mode 100644 index 0000000000000..dff77950659af --- /dev/null +++ b/core/src/main/scala/org/apache/spark/network/netty/FileServer.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.netty + +import java.net.InetSocketAddress + +import io.netty.bootstrap.ServerBootstrap +import io.netty.channel.{ChannelFuture, ChannelOption, EventLoopGroup} +import io.netty.channel.oio.OioEventLoopGroup +import io.netty.channel.socket.oio.OioServerSocketChannel + +import org.apache.spark.Logging + +/** + * Server that accept the path of a file an echo back its content. + */ +class FileServer(pResolver: PathResolver, private var port: Int) extends Logging { + + private val addr: InetSocketAddress = new InetSocketAddress(port) + private var bossGroup: EventLoopGroup = new OioEventLoopGroup + private var workerGroup: EventLoopGroup = new OioEventLoopGroup + + private var channelFuture: ChannelFuture = { + val bootstrap = new ServerBootstrap + bootstrap.group(bossGroup, workerGroup) + .channel(classOf[OioServerSocketChannel]) + .option(ChannelOption.SO_BACKLOG, java.lang.Integer.valueOf(100)) + .option(ChannelOption.SO_RCVBUF, java.lang.Integer.valueOf(1500)) + .childHandler(new FileServerChannelInitializer(pResolver)) + bootstrap.bind(addr) + } + + try { + val boundAddress = channelFuture.sync.channel.localAddress.asInstanceOf[InetSocketAddress] + port = boundAddress.getPort + } catch { + case ie: InterruptedException => + port = 0 + } + + /** Start the file server asynchronously in a new thread. */ + def start(): Unit = { + val blockingThread: Thread = new Thread { + override def run(): Unit = { + try { + channelFuture.channel.closeFuture.sync + logInfo("FileServer exiting") + } catch { + case e: InterruptedException => + logError("File server start got interrupted", e) + } + // NOTE: bootstrap is shutdown in stop() + } + } + blockingThread.setDaemon(true) + blockingThread.start() + } + + def getPort: Int = port + + def stop(): Unit = { + if (channelFuture != null) { + channelFuture.channel().close().awaitUninterruptibly() + channelFuture = null + } + if (bossGroup != null) { + bossGroup.shutdownGracefully() + bossGroup = null + } + if (workerGroup != null) { + workerGroup.shutdownGracefully() + workerGroup = null + } + } +} + diff --git a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java b/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala similarity index 54% rename from core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java rename to core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala index 46efec8f8d963..aaa2f913d0269 100644 --- a/core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java +++ b/core/src/main/scala/org/apache/spark/network/netty/FileServerChannelInitializer.scala @@ -15,27 +15,20 @@ * limitations under the License. */ -package org.apache.spark.network.netty; +package org.apache.spark.network.netty -import io.netty.channel.ChannelInitializer; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.codec.DelimiterBasedFrameDecoder; -import io.netty.handler.codec.Delimiters; -import io.netty.handler.codec.string.StringDecoder; +import io.netty.channel.ChannelInitializer +import io.netty.channel.socket.SocketChannel +import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters} +import io.netty.handler.codec.string.StringDecoder -class FileServerChannelInitializer extends ChannelInitializer { +class FileServerChannelInitializer(pResolver: PathResolver) + extends ChannelInitializer[SocketChannel] { - private final PathResolver pResolver; - - FileServerChannelInitializer(PathResolver pResolver) { - this.pResolver = pResolver; - } - - @Override - public void initChannel(SocketChannel channel) { - channel.pipeline() - .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())) - .addLast("stringDecoder", new StringDecoder()) - .addLast("handler", new FileServerHandler(pResolver)); + override def initChannel(channel: SocketChannel): Unit = { + channel.pipeline + .addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter : _*)) + .addLast("stringDecoder", new StringDecoder) + .addLast("handler", new FileServerHandler(pResolver)) } } diff --git a/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala b/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala new file mode 100644 index 0000000000000..96f60b2883ad9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/network/netty/FileServerHandler.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.netty + +import java.io.FileInputStream + +import io.netty.channel.{DefaultFileRegion, ChannelHandlerContext, SimpleChannelInboundHandler} + +import org.apache.spark.Logging +import org.apache.spark.storage.{BlockId, FileSegment} + + +class FileServerHandler(pResolver: PathResolver) + extends SimpleChannelInboundHandler[String] with Logging { + + override def channelRead0(ctx: ChannelHandlerContext, blockIdString: String): Unit = { + val blockId: BlockId = BlockId(blockIdString) + val fileSegment: FileSegment = pResolver.getBlockLocation(blockId) + if (fileSegment == null) { + return + } + val file = fileSegment.file + if (file.exists) { + if (!file.isFile) { + ctx.write(new FileHeader(0, blockId).buffer) + ctx.flush() + return + } + val length: Long = fileSegment.length + if (length > Integer.MAX_VALUE || length <= 0) { + ctx.write(new FileHeader(0, blockId).buffer) + ctx.flush() + return + } + ctx.write(new FileHeader(length.toInt, blockId).buffer) + try { + val channel = new FileInputStream(file).getChannel + ctx.write(new DefaultFileRegion(channel, fileSegment.offset, fileSegment.length)) + } catch { + case e: Exception => + logError("Exception: ", e) + } + } else { + ctx.write(new FileHeader(0, blockId).buffer) + } + ctx.flush() + } + + override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { + logError("Exception: ", cause) + ctx.close() + } +} diff --git a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java b/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala old mode 100755 new mode 100644 similarity index 80% rename from core/src/main/java/org/apache/spark/network/netty/PathResolver.java rename to core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala index 7ad8d03efbadc..0d7695072a7b1 --- a/core/src/main/java/org/apache/spark/network/netty/PathResolver.java +++ b/core/src/main/scala/org/apache/spark/network/netty/PathResolver.scala @@ -15,12 +15,11 @@ * limitations under the License. */ -package org.apache.spark.network.netty; +package org.apache.spark.network.netty -import org.apache.spark.storage.BlockId; -import org.apache.spark.storage.FileSegment; +import org.apache.spark.storage.{BlockId, FileSegment} -public interface PathResolver { +trait PathResolver { /** Get the file segment in which the given block resides. */ - FileSegment getBlockLocation(BlockId blockId); + def getBlockLocation(blockId: BlockId): FileSegment } diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala index 7ef7aecc6a9fb..95958e30f7eeb 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleSender.scala @@ -32,7 +32,7 @@ private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) ext server.stop() } - def port: Int = server.getPort() + def port: Int = server.getPort }