Skip to content

Commit

Permalink
Refactor of DiskStore for shuffle file consolidation
Browse files Browse the repository at this point in the history
The main goal of this refactor was to allow the interposition of a new layer which
maps logical BlockIds to physical locations other than a file with the same name
as the BlockId. In particular, BlockIds will need to be mappable to chunks of files,
as multiple will be stored in the same file.

In order to accomplish this, the following changes have been made:
- Creation of DiskBlockManager, which manages the association of logical BlockIds
  to physical disk locations (called FileSegments). By default, Blocks are simply
  mapped to physical files of the same name, as before.
- The DiskStore now indirects all requests for a given BlockId through the DiskBlockManager
  in order to resolve the actual File location.
- DiskBlockObjectWriter has been merged into BlockObjectWriter.
- The Netty PathResolver has been changed to map BlockIds into FileSegments, as this
  codepath is the only one that uses Netty, and that is likely to remain the case.

Overall, I think this refactor produces a clearer division between the logical Block
paradigm and their physical on-disk location. There is now an explicit (and documented)
mapping from one to the other.
  • Loading branch information
aarondav committed Oct 20, 2013
1 parent 747f538 commit 861dc40
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.DefaultFileRegion;

import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;

class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {

Expand All @@ -37,40 +38,34 @@ public FileServerHandler(PathResolver pResolver){
@Override
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
BlockId blockId = BlockId.apply(blockIdString);
String path = pResolver.getAbsolutePath(blockId.name());
// if getFilePath returns null, close the channel
if (path == null) {
FileSegment fileSegment = pResolver.getBlockLocation(blockId);
// if getBlockLocation returns null, close the channel
if (fileSegment == null) {
//ctx.close();
return;
}
File file = new File(path);
File file = fileSegment.file();
if (file.exists()) {
if (!file.isFile()) {
//logger.info("Not a file : " + file.getAbsolutePath());
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
return;
}
long length = file.length();
if (length > Integer.MAX_VALUE || length <= 0) {
//logger.info("too large file : " + file.getAbsolutePath() + " of size "+ length);
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
return;
}
int len = new Long(length).intValue();
//logger.info("Sending block "+blockId+" filelen = "+len);
//logger.info("header = "+ (new FileHeader(len, blockId)).buffer());
ctx.write((new FileHeader(len, blockId)).buffer());
try {
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
.getChannel(), 0, file.length()));
.getChannel(), fileSegment.offset(), fileSegment.length()));
} catch (Exception e) {
//logger.warning("Exception when sending file : " + file.getAbsolutePath());
e.printStackTrace();
}
} else {
//logger.warning("File not found: " + file.getAbsolutePath());
ctx.write(new FileHeader(0, blockId).buffer());
}
ctx.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.spark.network.netty;

import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.FileSegment;

public interface PathResolver {
/**
* Get the absolute path of the file
*
* @param fileId
* @return the absolute path of file
*/
public String getAbsolutePath(String fileId);
/** Get the file segment in which the given Block resides. */
public FileSegment getBlockLocation(BlockId blockId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.{BlockId, FileSegment}


private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
Expand Down Expand Up @@ -54,8 +54,7 @@ private[spark] object ShuffleSender {
val localDirs = args.drop(2).map(new File(_))

val pResovler = new PathResolver {
override def getAbsolutePath(blockIdString: String): String = {
val blockId = BlockId(blockIdString)
override def getBlockLocation(blockId: BlockId): FileSegment = {
if (!blockId.isShuffle) {
throw new Exception("Block " + blockId + " is not a shuffle block")
}
Expand All @@ -65,7 +64,7 @@ private[spark] object ShuffleSender {
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
val file = new File(subDir, blockId.name)
return file.getAbsolutePath
return new FileSegment(file, 0, file.length())
}
}
val sender = new ShuffleSender(port, pResovler)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ private[spark] class ShuffleMapTask(
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.size()
val size = writer.fileSegment().length
totalBytes += size
MapOutputTracker.compressSize(size)
}
Expand Down
33 changes: 24 additions & 9 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._

import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}

import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -102,18 +102,19 @@ private[spark] class BlockManager(
}

val shuffleBlockManager = new ShuffleBlockManager(this)
val diskBlockManager = new DiskBlockManager(
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]

private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: DiskStore =
new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
private[storage] val diskStore = new DiskStore(this, diskBlockManager)

// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val nettyPort: Int = {
val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
}

val connectionManager = new ConnectionManager(0)
Expand Down Expand Up @@ -567,16 +568,19 @@ private[spark] class BlockManager(

/**
* A short circuited method to get a block writer that can write data directly to disk.
* The Block will be appended to the File specified by filename.
* This is currently used for writing shuffle files out. Callers should handle error
* cases.
*/
def getDiskBlockWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int)
def getDiskWriter(blockId: BlockId, filename: String, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize)
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => {
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.size())
myInfo.markReady(writer.fileSegment().length)
})
writer
}
Expand Down Expand Up @@ -988,13 +992,24 @@ private[spark] class BlockManager(
if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
}

/** Serializes into a stream. */
def dataSerializeStream(
blockId: BlockId,
outputStream: OutputStream,
values: Iterator[Any],
serializer: Serializer = defaultSerializer) {
val byteStream = new FastBufferedOutputStream(outputStream)
val ser = serializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}

/** Serializes into a byte buffer. */
def dataSerialize(
blockId: BlockId,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): ByteBuffer = {
val byteStream = new FastByteArrayOutputStream(4096)
val ser = serializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
dataSerializeStream(blockId, byteStream, values, serializer)
byteStream.trim()
ByteBuffer.wrap(byteStream.array)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.spark.storage

import java.io.{FileOutputStream, File, OutputStream}
import java.nio.channels.FileChannel

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream

import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}

/**
* An interface for writing JVM objects to some underlying storage. This interface allows
Expand Down Expand Up @@ -59,7 +66,86 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
def write(value: Any)

/**
* Size of the valid writes, in bytes.
* Returns the file segment of committed data that this Writer has written.
*/
def size(): Long
def fileSegment(): FileSegment
}

/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
class DiskBlockObjectWriter(
blockId: BlockId,
file: File,
serializer: Serializer,
bufferSize: Int,
compressStream: OutputStream => OutputStream)
extends BlockObjectWriter(blockId)
with Logging
{

/** The file channel, used for repositioning / truncating the file. */
private var channel: FileChannel = null
private var bs: OutputStream = null
private var objOut: SerializationStream = null
private var initialPosition = 0L
private var lastValidPosition = 0L
private var initialized = false

override def open(): BlockObjectWriter = {
val fos = new FileOutputStream(file, true)
channel = fos.getChannel()
initialPosition = channel.position
lastValidPosition = initialPosition
bs = compressStream(new FastBufferedOutputStream(fos, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
this
}

override def close() {
if (initialized) {
objOut.close()
channel = null
bs = null
objOut = null
}
super.close()
}

override def isOpen: Boolean = objOut != null

override def commit(): Long = {
if (initialized) {
// NOTE: Flush the serializer first and then the compressed/buffered output stream
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
lastValidPosition = channel.position()
lastValidPosition - prevPos
} else {
// lastValidPosition is zero if stream is uninitialized
lastValidPosition
}
}

override def revertPartialWrites() {
if (initialized) {
// Discard current writes. We do this by flushing the outstanding writes and
// truncate the file to the last valid position.
objOut.flush()
bs.flush()
channel.truncate(lastValidPosition)
}
}

override def write(value: Any) {
if (!initialized) {
open()
}
objOut.writeObject(value)
}

override def fileSegment(): FileSegment = {
val bytesWritten = lastValidPosition - initialPosition
new FileSegment(file, initialPosition, bytesWritten)
}
}
Loading

0 comments on commit 861dc40

Please sign in to comment.