Skip to content

Commit

Permalink
Revert "[SPARK-2468] Netty based block server / client module"
Browse files Browse the repository at this point in the history
This reverts commit 3a8b68b.
  • Loading branch information
pwendell committed Aug 15, 2014
1 parent 7589c39 commit fd9fcd2
Show file tree
Hide file tree
Showing 29 changed files with 667 additions and 2,770 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty

import java.util.concurrent.TimeUnit

import io.netty.bootstrap.Bootstrap
import io.netty.channel.{Channel, ChannelOption, EventLoopGroup}
import io.netty.channel.oio.OioEventLoopGroup
import io.netty.channel.socket.oio.OioSocketChannel

import org.apache.spark.Logging

class FileClient(handler: FileClientHandler, connectTimeout: Int) extends Logging {

private var channel: Channel = _
private var bootstrap: Bootstrap = _
private var group: EventLoopGroup = _
private val sendTimeout = 60

def init(): Unit = {
group = new OioEventLoopGroup
bootstrap = new Bootstrap
bootstrap.group(group)
.channel(classOf[OioSocketChannel])
.option(ChannelOption.SO_KEEPALIVE, java.lang.Boolean.TRUE)
.option(ChannelOption.TCP_NODELAY, java.lang.Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(connectTimeout))
.handler(new FileClientChannelInitializer(handler))
}

def connect(host: String, port: Int) {
try {
channel = bootstrap.connect(host, port).sync().channel()
} catch {
case e: InterruptedException =>
logWarning("FileClient interrupted while trying to connect", e)
close()
}
}

def waitForClose(): Unit = {
try {
channel.closeFuture.sync()
} catch {
case e: InterruptedException =>
logWarning("FileClient interrupted", e)
}
}

def sendRequest(file: String): Unit = {
try {
val bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS)
if (!bSent) {
throw new RuntimeException("Failed to send")
}
} catch {
case e: InterruptedException =>
logError("Error", e)
}
}

def close(): Unit = {
if (group != null) {
group.shutdownGracefully()
group = null
bootstrap = null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,17 @@
* limitations under the License.
*/

package org.apache.spark.storage
package org.apache.spark.network.netty

import java.nio.ByteBuffer
import io.netty.channel.ChannelInitializer
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.string.StringEncoder


/**
* An interface for providing data for blocks.
*
* getBlockData returns either a FileSegment (for zero-copy send), or a ByteBuffer.
*
* Aside from unit tests, [[BlockManager]] is the main class that implements this.
*/
private[spark] trait BlockDataProvider {
def getBlockData(blockId: String): Either[FileSegment, ByteBuffer]
class FileClientChannelInitializer(handler: FileClientHandler)
extends ChannelInitializer[SocketChannel] {

def initChannel(channel: SocketChannel) {
channel.pipeline.addLast("encoder", new StringEncoder).addLast("handler", handler)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty

import io.netty.buffer.ByteBuf
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}

import org.apache.spark.storage.BlockId


abstract class FileClientHandler extends SimpleChannelInboundHandler[ByteBuf] {

private var currentHeader: FileHeader = null

@volatile
private var handlerCalled: Boolean = false

def isComplete: Boolean = handlerCalled

def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader)

def handleError(blockId: BlockId)

override def channelRead0(ctx: ChannelHandlerContext, in: ByteBuf) {
if (currentHeader == null && in.readableBytes >= FileHeader.HEADER_SIZE) {
currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE))
}
if (in.readableBytes >= currentHeader.fileLen) {
handle(ctx, in, currentHeader)
handlerCalled = true
currentHeader = null
ctx.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty

import io.netty.buffer._

import org.apache.spark.Logging
import org.apache.spark.storage.{BlockId, TestBlockId}

private[spark] class FileHeader (
val fileLen: Int,
val blockId: BlockId) extends Logging {

lazy val buffer: ByteBuf = {
val buf = Unpooled.buffer()
buf.capacity(FileHeader.HEADER_SIZE)
buf.writeInt(fileLen)
buf.writeInt(blockId.name.length)
blockId.name.foreach((x: Char) => buf.writeByte(x))
// padding the rest of header
if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
} else {
throw new Exception("too long header " + buf.readableBytes)
logInfo("too long header")
}
buf
}

}

private[spark] object FileHeader {

val HEADER_SIZE = 40

def getFileLenOffset = 0
def getFileLenSize = Integer.SIZE/8

def create(buf: ByteBuf): FileHeader = {
val length = buf.readInt
val idLength = buf.readInt
val idBuilder = new StringBuilder(idLength)
for (i <- 1 to idLength) {
idBuilder += buf.readByte().asInstanceOf[Char]
}
val blockId = BlockId(idBuilder.toString())
new FileHeader(length, blockId)
}

def main(args:Array[String]) {
val header = new FileHeader(25, TestBlockId("my_block"))
val buf = header.buffer
val newHeader = FileHeader.create(buf)
System.out.println("id=" + newHeader.blockId + ",size=" + newHeader.fileLen)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty

import java.net.InetSocketAddress

import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.{ChannelFuture, ChannelOption, EventLoopGroup}
import io.netty.channel.oio.OioEventLoopGroup
import io.netty.channel.socket.oio.OioServerSocketChannel

import org.apache.spark.Logging

/**
* Server that accept the path of a file an echo back its content.
*/
class FileServer(pResolver: PathResolver, private var port: Int) extends Logging {

private val addr: InetSocketAddress = new InetSocketAddress(port)
private var bossGroup: EventLoopGroup = new OioEventLoopGroup
private var workerGroup: EventLoopGroup = new OioEventLoopGroup

private var channelFuture: ChannelFuture = {
val bootstrap = new ServerBootstrap
bootstrap.group(bossGroup, workerGroup)
.channel(classOf[OioServerSocketChannel])
.option(ChannelOption.SO_BACKLOG, java.lang.Integer.valueOf(100))
.option(ChannelOption.SO_RCVBUF, java.lang.Integer.valueOf(1500))
.childHandler(new FileServerChannelInitializer(pResolver))
bootstrap.bind(addr)
}

try {
val boundAddress = channelFuture.sync.channel.localAddress.asInstanceOf[InetSocketAddress]
port = boundAddress.getPort
} catch {
case ie: InterruptedException =>
port = 0
}

/** Start the file server asynchronously in a new thread. */
def start(): Unit = {
val blockingThread: Thread = new Thread {
override def run(): Unit = {
try {
channelFuture.channel.closeFuture.sync
logInfo("FileServer exiting")
} catch {
case e: InterruptedException =>
logError("File server start got interrupted", e)
}
// NOTE: bootstrap is shutdown in stop()
}
}
blockingThread.setDaemon(true)
blockingThread.start()
}

def getPort: Int = port

def stop(): Unit = {
if (channelFuture != null) {
channelFuture.channel().close().awaitUninterruptibly()
channelFuture = null
}
if (bossGroup != null) {
bossGroup.shutdownGracefully()
bossGroup = null
}
if (workerGroup != null) {
workerGroup.shutdownGracefully()
workerGroup = null
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,20 @@
* limitations under the License.
*/

package org.apache.spark.network.netty.server
package org.apache.spark.network.netty

import io.netty.channel.ChannelInitializer
import io.netty.channel.socket.SocketChannel
import io.netty.handler.codec.LineBasedFrameDecoder
import io.netty.handler.codec.{DelimiterBasedFrameDecoder, Delimiters}
import io.netty.handler.codec.string.StringDecoder
import io.netty.util.CharsetUtil
import org.apache.spark.storage.BlockDataProvider


/** Channel initializer that sets up the pipeline for the BlockServer. */
private[netty]
class BlockServerChannelInitializer(dataProvider: BlockDataProvider)
class FileServerChannelInitializer(pResolver: PathResolver)
extends ChannelInitializer[SocketChannel] {

override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline
.addLast("frameDecoder", new LineBasedFrameDecoder(1024)) // max block id length 1024
.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
.addLast("blockHeaderEncoder", new BlockHeaderEncoder)
.addLast("handler", new BlockServerHandler(dataProvider))
override def initChannel(channel: SocketChannel): Unit = {
channel.pipeline
.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter : _*))
.addLast("stringDecoder", new StringDecoder)
.addLast("handler", new FileServerHandler(pResolver))
}
}
Loading

0 comments on commit fd9fcd2

Please sign in to comment.