Skip to content

Commit

Permalink
[SPARK-44993][CORE] Move compareChecksums from ShuffleChecksumTestHel…
Browse files Browse the repository at this point in the history
…pe to ShuffleChecksumUtils
  • Loading branch information
dongjoon-hyun committed Aug 28, 2023
1 parent 8e779d1 commit ed8b876
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle

import java.io.{DataInputStream, File, FileInputStream}
import java.util.zip.CheckedInputStream

import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
import org.apache.spark.network.util.LimitedInputStream

object ShuffleChecksumUtils {

/**
* Ensure that the checksum values are consistent with index file and data file.
*/
def compareChecksums(
numPartition: Int,
algorithm: String,
checksum: File,
data: File,
index: File): Boolean = {
var checksumIn: DataInputStream = null
val expectChecksums = Array.ofDim[Long](numPartition)
try {
checksumIn = new DataInputStream(new FileInputStream(checksum))
(0 until numPartition).foreach(i => expectChecksums(i) = checksumIn.readLong())
} finally {
if (checksumIn != null) {
checksumIn.close()
}
}

var dataIn: FileInputStream = null
var indexIn: DataInputStream = null
var checkedIn: CheckedInputStream = null
try {
dataIn = new FileInputStream(data)
indexIn = new DataInputStream(new FileInputStream(index))
var prevOffset = indexIn.readLong
(0 until numPartition).foreach { i =>
val curOffset = indexIn.readLong
val limit = (curOffset - prevOffset).toInt
val bytes = new Array[Byte](limit)
val checksumCal = ShuffleChecksumHelper.getChecksumByAlgorithm(algorithm)
checkedIn = new CheckedInputStream(
new LimitedInputStream(dataIn, curOffset - prevOffset), checksumCal)
checkedIn.read(bytes, 0, limit)
prevOffset = curOffset
// checksum must be consistent at both write and read sides
if (checkedIn.getChecksum.getValue != expectChecksums(i)) return false
}
} finally {
if (dataIn != null) {
dataIn.close()
}
if (indexIn != null) {
indexIn.close()
}
if (checkedIn != null) {
checkedIn.close()
}
}
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

package org.apache.spark.shuffle

import java.io.{DataInputStream, File, FileInputStream}
import java.util.zip.CheckedInputStream

import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper
import org.apache.spark.network.util.LimitedInputStream
import java.io.File

trait ShuffleChecksumTestHelper {

Expand All @@ -38,46 +34,7 @@ trait ShuffleChecksumTestHelper {
assert(data.exists(), "Data file doesn't exist")
assert(index.exists(), "Index file doesn't exist")

var checksumIn: DataInputStream = null
val expectChecksums = Array.ofDim[Long](numPartition)
try {
checksumIn = new DataInputStream(new FileInputStream(checksum))
(0 until numPartition).foreach(i => expectChecksums(i) = checksumIn.readLong())
} finally {
if (checksumIn != null) {
checksumIn.close()
}
}

var dataIn: FileInputStream = null
var indexIn: DataInputStream = null
var checkedIn: CheckedInputStream = null
try {
dataIn = new FileInputStream(data)
indexIn = new DataInputStream(new FileInputStream(index))
var prevOffset = indexIn.readLong
(0 until numPartition).foreach { i =>
val curOffset = indexIn.readLong
val limit = (curOffset - prevOffset).toInt
val bytes = new Array[Byte](limit)
val checksumCal = ShuffleChecksumHelper.getChecksumByAlgorithm(algorithm)
checkedIn = new CheckedInputStream(
new LimitedInputStream(dataIn, curOffset - prevOffset), checksumCal)
checkedIn.read(bytes, 0, limit)
prevOffset = curOffset
// checksum must be consistent at both write and read sides
assert(checkedIn.getChecksum.getValue == expectChecksums(i))
}
} finally {
if (dataIn != null) {
dataIn.close()
}
if (indexIn != null) {
indexIn.close()
}
if (checkedIn != null) {
checkedIn.close()
}
}
assert(ShuffleChecksumUtils.compareChecksums(numPartition, algorithm, checksum, data, index),
"checksum must be consistent at both write and read sides")
}
}

0 comments on commit ed8b876

Please sign in to comment.