Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Spark client panics when reading DataFrame from entire repository / storageNamespace #7914

Closed
Isan-Rivkin opened this issue Jun 24, 2024 · 1 comment · Fixed by #7955
Assignees
Labels
bug Something isn't working tech-debt

Comments

@Isan-Rivkin
Copy link
Contributor

Isan-Rivkin commented Jun 24, 2024

What happened

Our spark client crashes when we create a dataframe for all metaranges / ranges instead of a specific commitID.

Steps to Reproduce:

  1. Have a lakeFS installation with _lakefs/actions/ or _lakefs/exported/ or just dummy dir _lakefs/dummydir/
  2. Initiate the spark client and call LakeFSContext.newDF breaks when not using specific commitID (when repository or storageNamespace config params are used).
  3. Call df.count() to verify it will iterate all objects - see example but use the repository or storageNamespace parameters
  4. See exception thrown with error Caused by: io.treeverse.jpebble.BadFileFormatException: Bad magic 30 30 63 38 61 22 0a 7d: wrong bytes

Expected behavior

to show count of objects when calling df.count()

lakeFS version

lakefs-spark-client-assembly_2.12-0.14.0

How lakeFS is installed

AWS

Affected clients

lakefs-spark-client-assembly_2.12-0.14.0

Relevant log output

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: io.treeverse.jpebble.BadFileFormatException: Bad magic 30 30 63 38 61 22 0a 7d: wrong bytes
  at io.treeverse.jpebble.BlockParser$.readMagic(BlockParser.scala:142)
  at io.treeverse.jpebble.BlockParser$.readFooter(BlockParser.scala:197)
  at io.treeverse.clients.SSTableReader.getProperties(SSTableReader.scala:119)
  at io.treeverse.clients.EntryRecordReader.initialize(LakeFSInputFormat.scala:108)
  at org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:221)
  at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:218)
  at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:173)
  at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:72)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
  at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
  at org.apache.spark.scheduler.Task.run(Task.scala:141)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
  at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
  at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)

Example code

// spark-shell --jars jars4/hadoop-common-3.3.4.jar,jars4/hadoop-aws-3.3.4.jar,jars4/aws-java-sdk-bundle-1.12.262.jar,lakefs-spark-client-assembly-0.14.0.jar 

{
    import org.apache.spark.SparkConf
    import io.treeverse.clients.LakeFSContext
    import org.apache.spark.sql.SparkSession
    import io.treeverse.clients.LakeFSJobParams

    val spark = SparkSession.builder().appName("a feature not a bug").master("local").getOrCreate()
    spark.sparkContext.hadoopConfiguration.set("lakefs.api.url", "https://<host>/api/v1")
    spark.sparkContext.hadoopConfiguration.set("lakefs.api.access_key", "...")
    spark.sparkContext.hadoopConfiguration.set("lakefs.api.secret_key", "...")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "...")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "...")

    val df = LakeFSContext.newDF(spark, LakeFSJobParams.forRepository("repo-name" ,"example"))
    
    df.count()
}
@Isan-Rivkin Isan-Rivkin added the bug Something isn't working label Jun 24, 2024
@Isan-Rivkin
Copy link
Contributor Author

@arielshaqed 2 cents please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working tech-debt
Projects
None yet
2 participants