From b087eb10e41a45a7f4995da2ea362b1562c78b21 Mon Sep 17 00:00:00 2001 From: "Wang, Fei" Date: Mon, 18 Nov 2024 22:01:17 -0800 Subject: [PATCH] revert ut --- .../spark/shuffle/celeborn/SparkUtils.java | 4 +- .../spark/shuffle/celeborn/SparkUtils.java | 4 +- .../spark/CelebornFetchFailureSuite.scala | 79 ++----------------- .../scheduler/SparkSchedulerHelper.scala | 28 ------- 4 files changed, 12 insertions(+), 103 deletions(-) delete mode 100644 tests/spark-it/src/test/scala/org/apache/spark/scheduler/SparkSchedulerHelper.scala diff --git a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java index 4272f9830ef..401aa8148db 100644 --- a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java +++ b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java @@ -221,7 +221,7 @@ public static void cancelShuffle(int shuffleId, String reason) { .defaultAlwaysNull() .build(); - public static boolean taskAnotherAttemptRunningOrSuccessful(long taskId) { + public static synchronized boolean taskAnotherAttemptRunningOrSuccessful(long taskId) { if (SparkContext$.MODULE$.getActive().nonEmpty()) { TaskSchedulerImpl taskScheduler = (TaskSchedulerImpl) SparkContext$.MODULE$.getActive().get().taskScheduler(); @@ -238,7 +238,7 @@ public static boolean taskAnotherAttemptRunningOrSuccessful(long taskId) { .asJavaCollection().stream() .anyMatch( ti -> { - if ((ti.running() || ti.successful()) + if ((!ti.finished() || ti.successful()) && ti.attemptNumber() != taskInfo.attemptNumber()) { LOG.info("Another attempt of task {} is running: {}.", taskInfo, ti); return true; diff --git a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java index 05f6093df0f..4907c037cd5 100644 --- a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java +++ b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java @@ -337,7 +337,7 @@ public static void cancelShuffle(int shuffleId, String reason) { .defaultAlwaysNull() .build(); - public static boolean taskAnotherAttemptRunningOrSuccessful(long taskId) { + public static synchronized boolean taskAnotherAttemptRunningOrSuccessful(long taskId) { if (SparkContext$.MODULE$.getActive().nonEmpty()) { TaskSchedulerImpl taskScheduler = (TaskSchedulerImpl) SparkContext$.MODULE$.getActive().get().taskScheduler(); @@ -354,7 +354,7 @@ public static boolean taskAnotherAttemptRunningOrSuccessful(long taskId) { .asJavaCollection().stream() .anyMatch( ti -> { - if ((ti.running() || ti.successful()) + if ((!ti.finished() || ti.successful()) && ti.attemptNumber() != taskInfo.attemptNumber()) { LOG.info("Another attempt of task {} is running: {}.", taskInfo, ti); return true; diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala index f5e8b04c6f9..f3cd382118c 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornFetchFailureSuite.scala @@ -23,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.{BarrierTaskContext, ShuffleDependency, SparkConf, SparkContextHelper, SparkException, TaskContext} import org.apache.spark.celeborn.ExceptionMakerHelper import org.apache.spark.rdd.RDD -import org.apache.spark.scheduler.SparkSchedulerHelper import org.apache.spark.shuffle.ShuffleHandle import org.apache.spark.shuffle.celeborn.{CelebornShuffleHandle, ShuffleManagerHook, SparkShuffleManager, SparkUtils, TestCelebornShuffleManager} import org.apache.spark.sql.SparkSession @@ -57,8 +56,7 @@ class CelebornFetchFailureSuite extends AnyFunSuite super.createWorker(map, storageDir) } - class ShuffleReaderGetHook(conf: CelebornConf, speculation: Boolean = false) - extends ShuffleManagerHook { + class ShuffleReaderGetHook(conf: CelebornConf) extends ShuffleManagerHook { var executed: AtomicBoolean = new AtomicBoolean(false) val lock = new Object @@ -67,10 +65,6 @@ class CelebornFetchFailureSuite extends AnyFunSuite startPartition: Int, endPartition: Int, context: TaskContext): Unit = { - val taskIndex = SparkSchedulerHelper.getTaskIndex(context.taskAttemptId()) - if (speculation && taskIndex == 0) { - Thread.sleep(3000) // sleep for speculation - } if (executed.get() == true) return lock.synchronized { @@ -88,19 +82,17 @@ class CelebornFetchFailureSuite extends AnyFunSuite val allFiles = workerDirs.map(dir => { new File(s"$dir/celeborn-worker/shuffle_data/$appUniqueId/$celebornShuffleId") }) - val datafiles = allFiles.filter(_.exists()) - if (datafiles.nonEmpty) { - if (taskIndex == 0) { // only cleanup the data file in the task with index 0 - datafiles.foreach(_.delete()) - executed.set(true) - } - } else { - throw new RuntimeException("unexpected, there must be some data file" + - s" under ${workerDirs.mkString(",")}") + val datafile = allFiles.filter(_.exists()) + .flatMap(_.listFiles().iterator).headOption + datafile match { + case Some(file) => file.delete() + case None => throw new RuntimeException("unexpected, there must be some data file" + + s" under ${workerDirs.mkString(",")}") } } case _ => throw new RuntimeException("unexpected, only support RssShuffleHandle here") } + executed.set(true) } } } @@ -447,61 +439,6 @@ class CelebornFetchFailureSuite extends AnyFunSuite } } - test(s"celeborn spark integration test - do not rerun stage if task another attempt is running") { - val sparkConf = new SparkConf().setAppName("rss-demo").setMaster("local[2,3]") - val sparkSession = SparkSession.builder() - .config(updateSparkConf(sparkConf, ShuffleMode.HASH)) - .config("spark.sql.shuffle.partitions", 2) - .config("spark.celeborn.shuffle.forceFallback.partition.enabled", false) - .config("spark.celeborn.client.spark.fetch.throwsFetchFailure", "true") - .config( - "spark.shuffle.manager", - "org.apache.spark.shuffle.celeborn.TestCelebornShuffleManager") - .config("spark.speculation", "true") - .config("spark.speculation.multiplier", "2") - .config("spark.speculation.quantile", "0") - .getOrCreate() - - val shuffleMgr = SparkContextHelper.env - .shuffleManager - .asInstanceOf[TestCelebornShuffleManager] - var preventUnnecessaryStageRerun = false - val lifecycleManager = shuffleMgr.getLifecycleManager - lifecycleManager.registerReportTaskShuffleFetchFailurePreCheck(new java.util.function.Function[ - java.lang.Long, - Boolean] { - override def apply(taskId: java.lang.Long): Boolean = { - val anotherRunningOrSuccessful = SparkUtils.taskAnotherAttemptRunningOrSuccessful(taskId) - if (anotherRunningOrSuccessful) { - preventUnnecessaryStageRerun = true - } - !anotherRunningOrSuccessful - } - }) - - val celebornConf = SparkUtils.fromSparkConf(sparkSession.sparkContext.getConf) - val hook = new ShuffleReaderGetHook(celebornConf, speculation = true) - TestCelebornShuffleManager.registerReaderGetHook(hook) - - val value = Range(1, 10000).mkString(",") - val tuples = sparkSession.sparkContext.parallelize(1 to 10000, 2) - .map { i => (i, value) }.groupByKey(2).collect() - - // verify result - assert(hook.executed.get() == true) - assert(preventUnnecessaryStageRerun) - assert(tuples.length == 10000) - for (elem <- tuples) { - assert(elem._2.mkString(",").equals(value)) - } - - shuffleMgr.unregisterShuffle(0) - assert(lifecycleManager.getUnregisterShuffleTime().containsKey(0)) - assert(lifecycleManager.getUnregisterShuffleTime().containsKey(1)) - - sparkSession.stop() - } - private def findAppShuffleId(rdd: RDD[_]): Int = { val deps = rdd.dependencies if (deps.size != 1 && !deps.head.isInstanceOf[ShuffleDependency[_, _, _]]) { diff --git a/tests/spark-it/src/test/scala/org/apache/spark/scheduler/SparkSchedulerHelper.scala b/tests/spark-it/src/test/scala/org/apache/spark/scheduler/SparkSchedulerHelper.scala deleted file mode 100644 index 18c20ccf222..00000000000 --- a/tests/spark-it/src/test/scala/org/apache/spark/scheduler/SparkSchedulerHelper.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import org.apache.spark.SparkContext - -object SparkSchedulerHelper { - def getTaskIndex(taskId: Long): Int = { - val scheduler = SparkContext.getActive.get.taskScheduler.asInstanceOf[TaskSchedulerImpl] - val taskSetManager = scheduler.taskIdToTaskSetManager.get(taskId) - taskSetManager.taskInfos.get(taskId).get.index - } -}