Skip to content

Howto DataSets

Animesh Trivedi edited this page Nov 30, 2017 · 8 revisions

Find min and max values

scala>input.agg(min("value"), max("value")).show

https://stackoverflow.com/questions/43232363/get-min-and-max-from-a-specific-column-scala-spark-dataframe

Filter out values or null condition

scala> orc.filter(orc.col("ss_sold_date_sk").isNull).count

Sum of a column in a dataset

scala> daatset.aggr(sum("col_name")).show

How to rename column in DS

val newNames = Seq("id", "x1", "x2", "x3")
val dfRenamed = df.toDF(newNames: _*)

How to make alias for an specific column

df.select($"_1".alias("x1"))

For multiple columns

val lookup = Map("_1" -> "foo", "_3" -> "bar")
df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)

or withColumnRenamed:

df.withColumnRenamed("_1", "x1")

which use with foldLeft to rename multiple columns:

lookup.foldLeft(df)((acc, ca) => acc.withColumnRenamed(ca._1, ca._2))

ref: http://stackoverflow.com/questions/35592917/renaming-column-names-of-a-data-frame-in-spark-scala

How to add a new column with index in a range from [0, n -1]

import spark.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// your input data set 
val input = spark.read.parquet("/sql/data1.pq") 
// your input data set with zip index 
val inputWithZipIndex = input.rdd.zipWithIndex
// now we need to convert it back to a Dataframe[Row], currently it is in Tuple2(T, Long). And while doing so, we can add a separate column with the zip index and new schema 
// new schema = old schema + a new column 
val newSchema = StructType(input.schema.fields :+ StructField("myIndex", LongType, false))
val inputRowRdd = inputWithZipIndex.map{case (row, zipIndex) => Row.fromSeq(row.toSeq :+ zipIndex)}
// now we convert the RDD back to a dataframe 
val inputWithIndex = spark.createDataFrame(inputRowRdd, newSchema) 
// now at this point we have a data set with a new column named "myIndex" which goes from [0, elements_in_rdd - 1] 

How to check is a DS is cached or not

scala> :type q2
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]

val cache = spark.sharedState.cacheManager
scala> cache.lookupCachedData(q2.queryExecution.logical).isDefined
res0: Boolean = false

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-caching.html

Clone this wiki locally