diff --git a/gradle.properties b/gradle.properties index f577604f..4c9f3281 100644 --- a/gradle.properties +++ b/gradle.properties @@ -7,10 +7,10 @@ GROUP=org.jetbrains.kotlinx.spark # can also be defined like ./gradlew -Pspark=X.X.X -Pscala=X.X.X build spark=3.5.1 #spark=3.4.2 -scala=2.13.13 +scala=2.13.14 #scala=2.12.19 skipScalaOnlyDependent=false -sparkConnect=false +sparkConnect=true org.gradle.caching=true org.gradle.parallel=false #kotlin.incremental.useClasspathSnapshot=true diff --git a/gradle/bootstraps/compiler-plugin.jar b/gradle/bootstraps/compiler-plugin.jar index 6de5e469..78fe0355 100644 Binary files a/gradle/bootstraps/compiler-plugin.jar and b/gradle/bootstraps/compiler-plugin.jar differ diff --git a/gradle/bootstraps/gradle-plugin.jar b/gradle/bootstraps/gradle-plugin.jar index 740a990d..cbc37327 100644 Binary files a/gradle/bootstraps/gradle-plugin.jar and b/gradle/bootstraps/gradle-plugin.jar differ diff --git a/kotlin-spark-api/build.gradle.kts b/kotlin-spark-api/build.gradle.kts index 9e0097d7..ede91174 100644 --- a/kotlin-spark-api/build.gradle.kts +++ b/kotlin-spark-api/build.gradle.kts @@ -17,7 +17,6 @@ plugins { group = Versions.groupID version = Versions.project - repositories { mavenCentral() mavenLocal() @@ -33,7 +32,7 @@ dependencies { Projects { api( scalaHelpers, - scalaTuplesInKotlin + scalaTuplesInKotlin, ) } @@ -42,14 +41,18 @@ dependencies { // https://github.com/FasterXML/jackson-bom/issues/52 if (Versions.spark == "3.3.1") implementation(jacksonDatabind) - // if (Versions.sparkConnect) TODO("unsupported for now") + if (Versions.sparkConnect) { + // IMPORTANT! + compileOnly(sparkSqlApi) + implementation(sparkConnectClient) + } else { + implementation(sparkSql) + } implementation( + hadoopClient, kotlinStdLib, reflect, - sparkSql, - sparkStreaming, - hadoopClient, kotlinDateTime, ) @@ -68,7 +71,10 @@ dependencies { // Setup preprocessing with JCP for main sources -val kotlinMainSources = kotlin.sourceSets.main.get().kotlin.sourceDirectories +val kotlinMainSources = + kotlin.sourceSets.main + .get() + .kotlin.sourceDirectories val preprocessMain by tasks.creating(JcpTask::class) { sources = kotlinMainSources @@ -107,7 +113,10 @@ tasks.compileKotlin { // Setup preprocessing with JCP for test sources -val kotlinTestSources = kotlin.sourceSets.test.get().kotlin.sourceDirectories +val kotlinTestSources = + kotlin.sourceSets.test + .get() + .kotlin.sourceDirectories val preprocessTest by tasks.creating(JcpTask::class) { sources = kotlinTestSources diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt index 44da4147..8aff104e 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt @@ -27,8 +27,7 @@ package org.jetbrains.kotlinx.spark.api -import org.apache.spark.api.java.Optional -import scala.* +import scala.* import java.util.* import java.util.Enumeration import java.util.concurrent.ConcurrentMap @@ -43,14 +42,6 @@ import scala.collection.mutable.Buffer as ScalaMutableBuffer import scala.collection.mutable.Map as ScalaMutableMap import scala.collection.mutable.Seq as ScalaMutableSeq import scala.collection.mutable.Set as ScalaMutableSet -import org.apache.spark.streaming.State - -/** Returns state value if it exists, else `null`. */ -fun State.getOrNull(): T? = if (exists()) get() else null - -/** Returns state value if it exists, else [other]. */ -fun State.getOrElse(other: T): T = if (exists()) get() else other - /** Converts Scala [Option] to Kotlin nullable. */ fun Option.getOrNull(): T? = getOrElse(null) @@ -59,20 +50,20 @@ fun Option.getOrNull(): T? = getOrElse(null) fun Option.getOrElse(other: T): T = getOrElse { other } /** Converts nullable value to Scala [Option]. */ -fun T?.toOption(): Option = Option.apply(this) +fun T.toOption(): Option = Option.apply(this) /** Converts Scala [Option] to Java [Optional]. */ -fun Option.toOptional(): Optional = Optional.ofNullable(getOrNull()) +fun Option.toOptional(): Optional = Optional.ofNullable(getOrNull()) /** Converts [Optional] to Kotlin nullable. */ -fun Optional.getOrNull(): T? = orNull() +fun Optional.getOrNull(): T? = orElse(null) /** Get if available else [other]. */ fun Optional.getOrElse(other: T): T = orElse(other) /** Converts nullable value to [Optional]. */ -fun T?.toOptional(): Optional = Optional.ofNullable(this) +fun T.toOptional(): Optional = Optional.ofNullable(this) /** Converts Java [Optional] to Scala [Option]. */ fun Optional.toOption(): Option = Option.apply(getOrNull()) diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt index 6b7b0af1..d822f87f 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Dataset.kt @@ -29,14 +29,12 @@ package org.jetbrains.kotlinx.spark.api -import org.apache.spark.api.java.JavaRDDLike import org.apache.spark.api.java.function.FlatMapFunction import org.apache.spark.api.java.function.ForeachFunction import org.apache.spark.api.java.function.ForeachPartitionFunction import org.apache.spark.api.java.function.MapFunction import org.apache.spark.api.java.function.MapPartitionsFunction import org.apache.spark.api.java.function.ReduceFunction -import org.apache.spark.rdd.RDD import org.apache.spark.sql.* import scala.Tuple2 import scala.Tuple3 @@ -100,33 +98,6 @@ inline fun Array.toDS(spark: SparkSession): Dataset = inline fun Array.toDF(spark: SparkSession, vararg colNames: String): Dataset = toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) } -/** - * Utility method to create dataset from RDD - */ -inline fun RDD.toDS(spark: SparkSession): Dataset = - spark.createDataset(this, kotlinEncoderFor()) - -/** - * Utility method to create dataset from JavaRDD - */ -inline fun JavaRDDLike.toDS(spark: SparkSession): Dataset = - spark.createDataset(this.rdd(), kotlinEncoderFor()) - -/** - * Utility method to create Dataset (Dataframe) from JavaRDD. - * NOTE: [T] must be [Serializable]. - */ -inline fun JavaRDDLike.toDF(spark: SparkSession, vararg colNames: String): Dataset = - toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) } - -/** - * Utility method to create Dataset (Dataframe) from RDD. - * NOTE: [T] must be [Serializable]. - */ -inline fun RDD.toDF(spark: SparkSession, vararg colNames: String): Dataset = - toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) } - - /** * (Kotlin-specific) * Returns a new Dataset that contains the result of applying [func] to each element. @@ -271,21 +242,6 @@ inline fun Dataset.forEach(noinline func: (T) -> Unit): Unit = fo inline fun Dataset.forEachPartition(noinline func: (Iterator) -> Unit): Unit = foreachPartition(ForeachPartitionFunction(func)) -/** - * It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that - */ -fun Dataset.debugCodegen(): Dataset = also { - org.apache.spark.sql.execution.debug.`package$`.`MODULE$`.DebugQuery(it).debugCodegen() -} - -/** - * It's hard to call `Dataset.debug` from kotlin, so here is utility for that - */ -fun Dataset.debug(): Dataset = also { - org.apache.spark.sql.execution.debug.`package$`.`MODULE$`.DebugQuery(it).debug() -} - - /** * Alias for [Dataset.joinWith] which passes "left" argument * and respects the fact that in result of left join right relation is nullable diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt index 0306b8f6..219be4ae 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Encoding.kt @@ -116,9 +116,9 @@ fun kotlinEncoderFor(kType: KType): Encoder = */ private fun applyEncoder(agnosticEncoder: AgnosticEncoder): Encoder { //#if sparkConnect == false - return org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.apply(agnosticEncoder) + //$return org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.apply(agnosticEncoder) //#else - //$return agnosticEncoder + return agnosticEncoder //#endif } diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Rdd.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Rdd.kt deleted file mode 100644 index 0ab701b4..00000000 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Rdd.kt +++ /dev/null @@ -1,47 +0,0 @@ -package org.jetbrains.kotlinx.spark.api - -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.api.java.JavaSparkContext -import java.io.Serializable - -/** - * Utility method to create an RDD from a list. - * NOTE: [T] must be [Serializable]. - */ -fun JavaSparkContext.rddOf( - vararg elements: T, - numSlices: Int = defaultParallelism(), -): JavaRDD = parallelize(elements.asList(), numSlices) - -/** - * Utility method to create an RDD from a list. - * NOTE: [T] must be [Serializable]. - */ -fun JavaSparkContext.toRDD( - elements: List, - numSlices: Int = defaultParallelism(), -): JavaRDD = parallelize(elements, numSlices) - -/** - * Returns the minimum element from this RDD as defined by the specified - * [Comparator]. - * - * @return the minimum of the RDD - */ -fun > JavaRDD.min(): T = min( - object : Comparator, Serializable { - override fun compare(o1: T, o2: T): Int = o1.compareTo(o2) - } -) - -/** - * Returns the maximum element from this RDD as defined by the specified - * [Comparator]. - * - * @return the maximum of the RDD - */ -fun > JavaRDD.max(): T = max( - object : Comparator, Serializable { - override fun compare(o1: T, o2: T): Int = o1.compareTo(o2) - } -) diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/RddDouble.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/RddDouble.kt deleted file mode 100644 index 6bc28203..00000000 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/RddDouble.kt +++ /dev/null @@ -1,105 +0,0 @@ -package org.jetbrains.kotlinx.spark.api - -import org.apache.spark.api.java.JavaDoubleRDD -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.partial.BoundedDouble -import org.apache.spark.partial.PartialResult -import org.apache.spark.rdd.RDD -import org.apache.spark.util.StatCounter -import scala.Tuple2 - -/** Utility method to convert [JavaRDD]<[Number]> to [JavaDoubleRDD]. */ -@Suppress("UNCHECKED_CAST") -inline fun JavaRDD.toJavaDoubleRDD(): JavaDoubleRDD = - JavaDoubleRDD.fromRDD( - when (T::class) { - Double::class -> this - else -> map(Number::toDouble) - }.rdd() as RDD - ) - -/** Utility method to convert [JavaDoubleRDD] to [JavaRDD]<[Double]>. */ -@Suppress("UNCHECKED_CAST") -inline fun JavaDoubleRDD.toDoubleRDD(): JavaRDD = - JavaDoubleRDD.toRDD(this).toJavaRDD() as JavaRDD - -/** Add up the elements in this RDD. */ -inline fun JavaRDD.sum(): Double = toJavaDoubleRDD().sum() - -/** - * Return a [org.apache.spark.util.StatCounter] object that captures the mean, variance and - * count of the RDD's elements in one operation. - */ -inline fun JavaRDD.stats(): StatCounter = toJavaDoubleRDD().stats() - -/** Compute the mean of this RDD's elements. */ -inline fun JavaRDD.mean(): Double = toJavaDoubleRDD().mean() - -/** Compute the population variance of this RDD's elements. */ -inline fun JavaRDD.variance(): Double = toJavaDoubleRDD().variance() - -/** Compute the population standard deviation of this RDD's elements. */ -inline fun JavaRDD.stdev(): Double = toJavaDoubleRDD().stdev() - -/** - * Compute the sample standard deviation of this RDD's elements (which corrects for bias in - * estimating the standard deviation by dividing by N-1 instead of N). - */ -inline fun JavaRDD.sampleStdev(): Double = toJavaDoubleRDD().sampleStdev() - -/** - * Compute the sample variance of this RDD's elements (which corrects for bias in - * estimating the variance by dividing by N-1 instead of N). - */ -inline fun JavaRDD.sampleVariance(): Double = toJavaDoubleRDD().sampleVariance() - -/** Compute the population standard deviation of this RDD's elements. */ -inline fun JavaRDD.popStdev(): Double = toJavaDoubleRDD().popStdev() - -/** Compute the population variance of this RDD's elements. */ -inline fun JavaRDD.popVariance(): Double = toJavaDoubleRDD().popVariance() - -/** Approximate operation to return the mean within a timeout. */ -inline fun JavaRDD.meanApprox( - timeout: Long, - confidence: Double = 0.95, -): PartialResult = toJavaDoubleRDD().meanApprox(timeout, confidence) - -/** Approximate operation to return the sum within a timeout. */ -inline fun JavaRDD.sumApprox( - timeout: Long, - confidence: Double = 0.95, -): PartialResult = toJavaDoubleRDD().sumApprox(timeout, confidence) - -/** - * Compute a histogram of the data using bucketCount number of buckets evenly - * spaced between the minimum and maximum of the RDD. For example if the min - * value is 0 and the max is 100 and there are two buckets the resulting - * buckets will be `[0, 50)` `[50, 100]`. bucketCount must be at least 1 - * If the RDD contains infinity, NaN throws an exception - * If the elements in RDD do not vary (max == min) always returns a single bucket. - */ -inline fun JavaRDD.histogram(bucketCount: Int): Tuple2 = - toJavaDoubleRDD().histogram(bucketCount) - -/** - * Compute a histogram using the provided buckets. The buckets are all open - * to the right except for the last which is closed. - * e.g. for the array - * `[1, 10, 20, 50]` the buckets are `[1, 10) [10, 20) [20, 50]` - * e.g. ` <=x<10, 10<=x<20, 20<=x<=50` - * And on the input of 1 and 50 we would have a histogram of 1, 0, 1 - * - * Note: If your histogram is evenly spaced (e.g. `[0, 10, 20, 30]`) this can be switched - * from an O(log n) insertion to O(1) per element. (where n = # buckets) if you set evenBuckets - * to true. - * buckets must be sorted and not contain any duplicates. - * buckets array must be at least two elements - * All NaN entries are treated the same. If you have a NaN bucket it must be - * the maximum value of the last position and all NaN entries will be counted - * in that bucket. - */ -inline fun JavaRDD.histogram( - buckets: Array, - evenBuckets: Boolean = false, -): LongArray = toJavaDoubleRDD().histogram(buckets, evenBuckets) \ No newline at end of file diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/RddKeyValue.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/RddKeyValue.kt deleted file mode 100644 index b04bf6b9..00000000 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/RddKeyValue.kt +++ /dev/null @@ -1,944 +0,0 @@ -@file:Suppress("unused") - -package org.jetbrains.kotlinx.spark.api - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.compress.CompressionCodec -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.OutputFormat -import org.apache.spark.Partitioner -import org.apache.spark.RangePartitioner -import org.apache.spark.api.java.JavaPairRDD -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.api.java.Optional -import org.apache.spark.partial.BoundedDouble -import org.apache.spark.partial.PartialResult -import org.apache.spark.serializer.Serializer -import org.jetbrains.kotlinx.spark.api.tuples.* -import scala.Tuple2 -import scala.Tuple3 -import scala.Tuple4 -import kotlin.random.Random -import org.apache.hadoop.mapreduce.OutputFormat as NewOutputFormat - -/** Utility method to convert [JavaRDD]<[Tuple2]> to [JavaPairRDD]. */ -fun JavaRDD>.toJavaPairRDD(): JavaPairRDD = - JavaPairRDD.fromJavaRDD(this) - -/** Utility method to convert [JavaPairRDD] to [JavaRDD]<[Tuple2]>. */ -fun JavaPairRDD.toTupleRDD(): JavaRDD> = - JavaPairRDD.toRDD(this).toJavaRDD() - -/** - * Generic function to combine the elements for each key using a custom set of aggregation - * functions. This method is here for backward compatibility. It does not provide combiner - * classtag information to the shuffle. - */ -fun JavaRDD>.combineByKey( - createCombiner: (V) -> C, - mergeValue: (C, V) -> C, - mergeCombiners: (C, C) -> C, - partitioner: Partitioner, - mapSideCombine: Boolean = true, - serializer: Serializer? = null, -): JavaRDD> = toJavaPairRDD() - .combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine, serializer) - .toTupleRDD() - -/** - * Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD. - * This method is here for backward compatibility. It does not provide combiner - * classtag information to the shuffle. - */ -fun JavaRDD>.combineByKey( - createCombiner: (V) -> C, - mergeValue: (C, V) -> C, - mergeCombiners: (C, C) -> C, - numPartitions: Int, -): JavaRDD> = toJavaPairRDD() - .combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions) - .toTupleRDD() - - -/** - * Aggregate the values of each key, using given combine functions and a neutral "zero value". - * This function can return a different result type, [U], than the type of the values in this RDD, - * [V]. Thus, we need one operation for merging a [V] into a [U] and one operation for merging two [U]'s, - * as in scala.TraversableOnce. The former operation is used for merging values within a - * partition, and the latter is used for merging values between partitions. To avoid memory - * allocation, both of these functions are allowed to modify and return their first argument - * instead of creating a new [U]. - */ -fun JavaRDD>.aggregateByKey( - zeroValue: U, - partitioner: Partitioner, - seqFunc: (U, V) -> U, - combFunc: (U, U) -> U, -): JavaRDD> = toJavaPairRDD() - .aggregateByKey(zeroValue, partitioner, seqFunc, combFunc) - .toTupleRDD() - -/** - * Aggregate the values of each key, using given combine functions and a neutral "zero value". - * This function can return a different result type, [U], than the type of the values in this RDD, - * [V]. Thus, we need one operation for merging a [V] into a [U] and one operation for merging two [U]'s, - * as in scala.TraversableOnce. The former operation is used for merging values within a - * partition, and the latter is used for merging values between partitions. To avoid memory - * allocation, both of these functions are allowed to modify and return their first argument - * instead of creating a new [U]. - */ -fun JavaRDD>.aggregateByKey( - zeroValue: U, - numPartitions: Int, - seqFunc: (U, V) -> U, - combFunc: (U, U) -> U, -): JavaRDD> = toJavaPairRDD() - .aggregateByKey(zeroValue, numPartitions, seqFunc, combFunc) - .toTupleRDD() - -/** - * Aggregate the values of each key, using given combine functions and a neutral "zero value". - * This function can return a different result type, [U], than the type of the values in this RDD, - * [V]. Thus, we need one operation for merging a [V] into a [U] and one operation for merging two [U]'s, - * as in scala.TraversableOnce. The former operation is used for merging values within a - * partition, and the latter is used for merging values between partitions. To avoid memory - * allocation, both of these functions are allowed to modify and return their first argument - * instead of creating a new [U]. - */ -fun JavaRDD>.aggregateByKey( - zeroValue: U, - seqFunc: (U, V) -> U, - combFunc: (U, U) -> U, -): JavaRDD> = toJavaPairRDD() - .aggregateByKey(zeroValue, seqFunc, combFunc) - .toTupleRDD() - -/** - * Merge the values for each key using an associative function and a neutral "zero value" which - * may be added to the result an arbitrary number of times, and must not change the result - * (e.g., [emptyList] for list concatenation, 0 for addition, or 1 for multiplication.). - */ -fun JavaRDD>.foldByKey( - zeroValue: V, - partitioner: Partitioner, - func: (V, V) -> V, -): JavaRDD> = toJavaPairRDD() - .foldByKey(zeroValue, partitioner, func) - .toTupleRDD() - -/** - * Merge the values for each key using an associative function and a neutral "zero value" which - * may be added to the result an arbitrary number of times, and must not change the result - * (e.g., [emptyList] for list concatenation, 0 for addition, or 1 for multiplication.). - */ -fun JavaRDD>.foldByKey( - zeroValue: V, - numPartitions: Int, - func: (V, V) -> V, -): JavaRDD> = toJavaPairRDD() - .foldByKey(zeroValue, numPartitions, func) - .toTupleRDD() - -/** - * Merge the values for each key using an associative function and a neutral "zero value" which - * may be added to the result an arbitrary number of times, and must not change the result - * (e.g., [emptyList] for list concatenation, 0 for addition, or 1 for multiplication.). - */ -fun JavaRDD>.foldByKey( - zeroValue: V, - func: (V, V) -> V, -): JavaRDD> = toJavaPairRDD() - .foldByKey(zeroValue, func) - .toTupleRDD() - -/** - * Return a subset of this RDD sampled by key (via stratified sampling). - * - * Create a sample of this RDD using variable sampling rates for different keys as specified by - * [fractions], a key to sampling rate map, via simple random sampling with one pass over the - * RDD, to produce a sample of size that's approximately equal to the sum of - * math.ceil(numItems * samplingRate) over all key values. - * - * @param withReplacement whether to sample with or without replacement - * @param fractions map of specific keys to sampling rates - * @param seed seed for the random number generator - * @return RDD containing the sampled subset - */ -fun JavaRDD>.sampleByKey( - withReplacement: Boolean, - fractions: Map, - seed: Long = Random.nextLong(), -): JavaRDD> = toJavaPairRDD() - .sampleByKey(withReplacement, fractions, seed) - .toTupleRDD() - -/** - * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly - * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key). - * - * This method differs from [sampleByKey] in that we make additional passes over the RDD to - * create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate) - * over all key values with a 99.99% confidence. When sampling without replacement, we need one - * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need - * two additional passes. - * - * @param withReplacement whether to sample with or without replacement - * @param fractions map of specific keys to sampling rates - * @param seed seed for the random number generator - * @return RDD containing the sampled subset - */ -fun JavaRDD>.sampleByKeyExact( - withReplacement: Boolean, - fractions: Map, - seed: Long = Random.nextLong(), -): JavaRDD> = toJavaPairRDD() - .sampleByKeyExact(withReplacement, fractions, seed) - .toTupleRDD() - -/** - * Merge the values for each key using an associative and commutative reduce function. This will - * also perform the merging locally on each mapper before sending results to a reducer, similarly - * to a "combiner" in MapReduce. - */ -fun JavaRDD>.reduceByKey( - partitioner: Partitioner, - func: (V, V) -> V, -): JavaRDD> = toJavaPairRDD() - .reduceByKey(partitioner, func) - .toTupleRDD() - -/** - * Merge the values for each key using an associative and commutative reduce function. This will - * also perform the merging locally on each mapper before sending results to a reducer, similarly - * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. - */ -fun JavaRDD>.reduceByKey( - numPartitions: Int, - func: (V, V) -> V, -): JavaRDD> = toJavaPairRDD() - .reduceByKey(func, numPartitions) - .toTupleRDD() - -/** - * Merge the values for each key using an associative and commutative reduce function. This will - * also perform the merging locally on each mapper before sending results to a reducer, similarly - * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ - * parallelism level. - */ -fun JavaRDD>.reduceByKey( - func: (V, V) -> V, -): JavaRDD> = toJavaPairRDD() - .reduceByKey(func) - .toTupleRDD() - -/** - * Merge the values for each key using an associative and commutative reduce function, but return - * the results immediately to the master as a Map. This will also perform the merging locally on - * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. - */ -fun JavaRDD>.reduceByKeyLocally( - func: (V, V) -> V, -): Map = toJavaPairRDD() - .reduceByKeyLocally(func) - -/** - * Count the number of elements for each key, collecting the results to a local Map. - * - * This method should only be used if the resulting map is expected to be small, as - * the whole thing is loaded into the driver's memory. - * To handle very large results, consider using `rdd.mapValues { 1L }.reduceByKey(Long::plus)`, which - * returns an [RDD] instead of a map. - */ -fun JavaRDD>.countByKey(): Map = - toJavaPairRDD() - .countByKey() - -/** - * Approximate version of countByKey that can return a partial result if it does - * not finish within a timeout. - * - * The confidence is the probability that the error bounds of the result will - * contain the true value. That is, if countApprox were called repeatedly - * with confidence 0.9, we would expect 90% of the results to contain the - * true count. The confidence must be in the range <0,1> or an exception will - * be thrown. - * - * @param timeout maximum time to wait for the job, in milliseconds - * @param confidence the desired statistical confidence in the result - * @return a potentially incomplete result, with error bounds - */ -fun JavaRDD>.countByKeyApprox( - timeout: Long, - confidence: Double = 0.95, -): PartialResult> = toJavaPairRDD() - .countByKeyApprox(timeout, confidence) - -/** - * Group the values for each key in the RDD into a single sequence. Allows controlling the - * partitioning of the resulting key-value pair RDD by passing a Partitioner. - * The ordering of elements within each group is not guaranteed, and may even differ - * each time the resulting RDD is evaluated. - * - * Note: This operation may be very expensive. If you are grouping in order to perform an - * aggregation (such as a sum or average) over each key, using [aggregateByKey] - * or [reduceByKey] will provide much better performance. - * - * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any - * key in memory. If a key has too many values, it can result in an [OutOfMemoryError]. - */ -fun JavaRDD>.groupByKey( - partitioner: Partitioner, -): JavaRDD>> = toJavaPairRDD() - .groupByKey(partitioner) - .toTupleRDD() - -/** - * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with into [numPartitions] partitions. The ordering of elements within - * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. - * - * Note: This operation may be very expensive. If you are grouping in order to perform an - * aggregation (such as a sum or average) over each key, using [aggregateByKey] - * or [reduceByKey] will provide much better performance. - * - * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any - * key in memory. If a key has too many values, it can result in an [OutOfMemoryError]. - */ -fun JavaRDD>.groupByKey( - numPartitions: Int, -): JavaRDD>> = toJavaPairRDD() - .groupByKey(numPartitions) - .toTupleRDD() - -/** - * Return a copy of the RDD partitioned using the specified partitioner. - */ -fun JavaRDD>.partitionBy( - partitioner: Partitioner, -): JavaRDD> = toJavaPairRDD() - .partitionBy(partitioner) - .toTupleRDD() - -/** - * Return an RDD containing all pairs of elements with matching keys in [this] and [other]. Each - * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in [this] and - * (k, v2) is in [other]. Uses the given Partitioner to partition the output RDD. - */ -fun JavaRDD>.join( - other: JavaRDD>, - partitioner: Partitioner, -): JavaRDD>> = toJavaPairRDD() - .join(other.toJavaPairRDD(), partitioner) - .toTupleRDD() - -/** - * Perform a left outer join of [this] and [other]. For each element (k, v) in [this], the - * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in [other], or the - * pair (k, (v, None)) if no elements in [other] have key k. Uses the given Partitioner to - * partition the output RDD. - */ -fun JavaRDD>.leftOuterJoin( - other: JavaRDD>, - partitioner: Partitioner, -): JavaRDD>>> = toJavaPairRDD() - .leftOuterJoin(other.toJavaPairRDD(), partitioner) - .toTupleRDD() - -/** - * Perform a right outer join of [this] and [other]. For each element (k, w) in [other], the - * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in [this], or the - * pair (k, (None, w)) if no elements in [this] have key k. Uses the given Partitioner to - * partition the output RDD. - */ -fun JavaRDD>.rightOuterJoin( - other: JavaRDD>, - partitioner: Partitioner, -): JavaRDD, W>>> = toJavaPairRDD() - .rightOuterJoin(other.toJavaPairRDD(), partitioner) - .toTupleRDD() - -/** - * Perform a full outer join of [this] and [other]. For each element (k, v) in [this], the - * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in [other], or - * the pair (k, (Some(v), None)) if no elements in [other] have key k. Similarly, for each - * element (k, w) in [other], the resulting RDD will either contain all pairs - * (k, (Some(v), Some(w))) for v in [this], or the pair (k, (None, Some(w))) if no elements - * in [this] have key k. Uses the given Partitioner to partition the output RDD. - */ -fun JavaRDD>.fullOuterJoin( - other: JavaRDD>, - partitioner: Partitioner, -): JavaRDD, Optional>>> = toJavaPairRDD() - .fullOuterJoin(other.toJavaPairRDD(), partitioner) - .toTupleRDD() - -/** - * Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the - * existing partitioner/parallelism level. This method is here for backward compatibility. It - * does not provide combiner classtag information to the shuffle. - */ -fun JavaRDD>.combineByKey( - createCombiner: (V) -> C, - mergeValue: (C, V) -> C, - mergeCombiners: (C, C) -> C, -): JavaRDD> = toJavaPairRDD() - .combineByKey(createCombiner, mergeValue, mergeCombiners) - .toTupleRDD() - - -/** - * Group the values for each key in the RDD into a single sequence. Hash-partitions the - * resulting RDD with the existing partitioner/parallelism level. The ordering of elements - * within each group is not guaranteed, and may even differ each time the resulting RDD is - * evaluated. - * - * Note: This operation may be very expensive. If you are grouping in order to perform an - * aggregation (such as a sum or average) over each key, using [aggregateByKey] - * or [reduceByKey] will provide much better performance. - */ -fun JavaRDD>.groupByKey(): JavaRDD>> = - toJavaPairRDD() - .groupByKey() - .toTupleRDD() - -/** - * Return an RDD containing all pairs of elements with matching keys in [this] and [other]. Each - * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in [this] and - * (k, v2) is in [other]. Performs a hash join across the cluster. - */ -fun JavaRDD>.join(other: JavaRDD>): JavaRDD>> = - toJavaPairRDD() - .join(other.toJavaPairRDD()) - .toTupleRDD() - -/** - * Return an RDD containing all pairs of elements with matching keys in [this] and [other]. Each - * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in [this] and - * (k, v2) is in [other]. Performs a hash join across the cluster. - */ -fun JavaRDD>.join( - other: JavaRDD>, - numPartitions: Int, -): JavaRDD>> = - toJavaPairRDD() - .join(other.toJavaPairRDD(), numPartitions) - .toTupleRDD() - -/** - * Perform a left outer join of [this] and [other]. For each element (k, v) in [this], the - * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in [other], or the - * pair (k, (v, None)) if no elements in [other] have key k. Hash-partitions the output - * using the existing partitioner/parallelism level. - */ -fun JavaRDD>.leftOuterJoin( - other: JavaRDD>, -): JavaRDD>>> = - toJavaPairRDD() - .leftOuterJoin(other.toJavaPairRDD()) - .toTupleRDD() - -/** - * Perform a left outer join of [this] and [other]. For each element (k, v) in [this], the - * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in [other], or the - * pair (k, (v, None)) if no elements in [other] have key k. Hash-partitions the output - * into [numPartitions] partitions. - */ -fun JavaRDD>.leftOuterJoin( - other: JavaRDD>, - numPartitions: Int, -): JavaRDD>>> = toJavaPairRDD() - .leftOuterJoin(other.toJavaPairRDD(), numPartitions) - .toTupleRDD() - -/** - * Perform a right outer join of [this] and [other]. For each element (k, w) in [other], the - * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in [this], or the - * pair (k, (None, w)) if no elements in [this] have key k. Hash-partitions the resulting - * RDD using the existing partitioner/parallelism level. - */ -fun JavaRDD>.rightOuterJoin( - other: JavaRDD>, -): JavaRDD, W>>> = - toJavaPairRDD() - .rightOuterJoin(other.toJavaPairRDD()) - .toTupleRDD() - -/** - * Perform a right outer join of [this] and [other]. For each element (k, w) in [other], the - * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in [this], or the - * pair (k, (None, w)) if no elements in [this] have key k. Hash-partitions the resulting - * RDD into the given number of partitions. - */ -fun JavaRDD>.rightOuterJoin( - other: JavaRDD>, - numPartitions: Int, -): JavaRDD, W>>> = toJavaPairRDD() - .rightOuterJoin(other.toJavaPairRDD(), numPartitions) - .toTupleRDD() - -/** - * Perform a full outer join of [this] and [other]. For each element (k, v) in [this], the - * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in [other], or - * the pair (k, (Some(v), None)) if no elements in [other] have key k. Similarly, for each - * element (k, w) in [other], the resulting RDD will either contain all pairs - * (k, (Some(v), Some(w))) for v in [this], or the pair (k, (None, Some(w))) if no elements - * in [this] have key k. Hash-partitions the resulting RDD using the existing partitioner/ - * parallelism level. - */ -fun JavaRDD>.fullOuterJoin( - other: JavaRDD>, -): JavaRDD, Optional>>> = - toJavaPairRDD() - .fullOuterJoin(other.toJavaPairRDD()) - .toTupleRDD() - -/** - * Perform a full outer join of [this] and [other]. For each element (k, v) in [this], the - * resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in [other], or - * the pair (k, (Some(v), None)) if no elements in [other] have key k. Similarly, for each - * element (k, w) in [other], the resulting RDD will either contain all pairs - * (k, (Some(v), Some(w))) for v in [this], or the pair (k, (None, Some(w))) if no elements - * in [this] have key k. Hash-partitions the resulting RDD into the given number of partitions. - */ -fun JavaRDD>.fullOuterJoin( - other: JavaRDD>, - numPartitions: Int, -): JavaRDD, Optional>>> = - toJavaPairRDD() - .fullOuterJoin(other.toJavaPairRDD(), numPartitions) - .toTupleRDD() - -/** - * Return the key-value pairs in this RDD to the master as a Map. - * - * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only - * one value per key is preserved in the map returned) - * - * Note: this method should only be used if the resulting data is expected to be small, as - * all the data is loaded into the driver's memory. - */ -fun JavaRDD>.collectAsMap(): Map = toJavaPairRDD().collectAsMap() - -/** - * Pass each key in the key-value pair RDD through a map function without changing the values; - * this also retains the original RDD's partitioning. - */ -fun JavaRDD>.mapKeys(f: (K) -> U): JavaRDD> = - mapPartitions({ - it.map { (_1, _2) -> - tupleOf(f(_1), _2) - } - }, true) - -/** - * Pass each value in the key-value pair RDD through a map function without changing the keys; - * this also retains the original RDD's partitioning. - */ -fun JavaRDD>.mapValues(f: (V) -> U): JavaRDD> = - toJavaPairRDD().mapValues(f).toTupleRDD() - -/** - * Pass each value in the key-value pair RDD through a flatMap function without changing the - * keys; this also retains the original RDD's partitioning. - */ -fun JavaRDD>.flatMapValues(f: (V) -> Iterator): JavaRDD> = - toJavaPairRDD().flatMapValues(f).toTupleRDD() - -/** - * For each key k in [this] or [other1] or [other2] or [other3], - * return a resulting RDD that contains a tuple with the list of values - * for that key in [this], [other1], [other2] and [other3]. - */ -fun JavaRDD>.cogroup( - other1: JavaRDD>, - other2: JavaRDD>, - other3: JavaRDD>, - partitioner: Partitioner, -): JavaRDD, Iterable, Iterable, Iterable>>> = - toJavaPairRDD().cogroup(other1.toJavaPairRDD(), other2.toJavaPairRDD(), other3.toJavaPairRDD(), partitioner) - .toTupleRDD() - -/** - * For each key k in [this] or [other], return a resulting RDD that contains a tuple with the - * list of values for that key in [this] as well as [other]. - */ -fun JavaRDD>.cogroup( - other: JavaRDD>, - partitioner: Partitioner, -): JavaRDD, Iterable>>> = toJavaPairRDD() - .cogroup(other.toJavaPairRDD(), partitioner) - .toTupleRDD() - -/** - * For each key k in [this] or [other1] or [other2], return a resulting RDD that contains a - * tuple with the list of values for that key in [this], [other1] and [other2]. - */ -fun JavaRDD>.cogroup( - other1: JavaRDD>, - other2: JavaRDD>, - partitioner: Partitioner, -): JavaRDD, Iterable, Iterable>>> = - toJavaPairRDD() - .cogroup(other1.toJavaPairRDD(), other2.toJavaPairRDD(), partitioner) - .toTupleRDD() - -/** - * For each key k in [this] or [other1] or [other2] or [other3], - * return a resulting RDD that contains a tuple with the list of values - * for that key in [this], [other1], [other2] and [other3]. - */ -fun JavaRDD>.cogroup( - other1: JavaRDD>, - other2: JavaRDD>, - other3: JavaRDD>, -): JavaRDD, Iterable, Iterable, Iterable>>> = - toJavaPairRDD() - .cogroup(other1.toJavaPairRDD(), other2.toJavaPairRDD(), other3.toJavaPairRDD()) - .toTupleRDD() - -/** - * For each key k in [this] or [other], return a resulting RDD that contains a tuple with the - * list of values for that key in [this] as well as [other]. - */ -fun JavaRDD>.cogroup( - other: JavaRDD>, -): JavaRDD, Iterable>>> = - toJavaPairRDD().cogroup(other.toJavaPairRDD()).toTupleRDD() - -/** - * For each key k in [this] or [other1] or [other2], return a resulting RDD that contains a - * tuple with the list of values for that key in [this], [other1] and [other2]. - */ -fun JavaRDD>.cogroup( - other1: JavaRDD>, - other2: JavaRDD>, -): JavaRDD, Iterable, Iterable>>> = - toJavaPairRDD() - .cogroup(other1.toJavaPairRDD(), other2.toJavaPairRDD()) - .toTupleRDD() - -/** - * For each key k in [this] or [other], return a resulting RDD that contains a tuple with the - * list of values for that key in [this] as well as [other]. - */ -fun JavaRDD>.cogroup( - other: JavaRDD>, - numPartitions: Int, -): JavaRDD, Iterable>>> = - toJavaPairRDD().cogroup(other.toJavaPairRDD(), numPartitions).toTupleRDD() - -/** - * For each key k in [this] or [other1] or [other2], return a resulting RDD that contains a - * tuple with the list of values for that key in [this], [other1] and [other2]. - */ -fun JavaRDD>.cogroup( - other1: JavaRDD>, - other2: JavaRDD>, - numPartitions: Int, -): JavaRDD, Iterable, Iterable>>> = - toJavaPairRDD() - .cogroup(other1.toJavaPairRDD(), other2.toJavaPairRDD(), numPartitions) - .toTupleRDD() - -/** - * For each key k in [this] or [other1] or [other2] or [other3], - * return a resulting RDD that contains a tuple with the list of values - * for that key in [this], [other1], [other2] and [other3]. - */ -fun JavaRDD>.cogroup( - other1: JavaRDD>, - other2: JavaRDD>, - other3: JavaRDD>, - numPartitions: Int, -): JavaRDD, Iterable, Iterable, Iterable>>> = - toJavaPairRDD() - .cogroup(other1.toJavaPairRDD(), other2.toJavaPairRDD(), other3.toJavaPairRDD(), numPartitions) - .toTupleRDD() - - -/** Alias for [cogroup]. */ -fun JavaRDD>.groupWith( - other: JavaRDD>, -): JavaRDD, Iterable>>> = - toJavaPairRDD().groupWith(other.toJavaPairRDD()).toTupleRDD() - -/** Alias for [cogroup]. */ -fun JavaRDD>.groupWith( - other1: JavaRDD>, - other2: JavaRDD>, -): JavaRDD, Iterable, Iterable>>> = - toJavaPairRDD().groupWith(other1.toJavaPairRDD(), other2.toJavaPairRDD()).toTupleRDD() - -/** Alias for [cogroup]. */ -fun JavaRDD>.groupWith( - other1: JavaRDD>, - other2: JavaRDD>, - other3: JavaRDD>, -): JavaRDD, Iterable, Iterable, Iterable>>> = - toJavaPairRDD().groupWith(other1.toJavaPairRDD(), other2.toJavaPairRDD(), other3.toJavaPairRDD()).toTupleRDD() - -/** - * Return an RDD with the pairs from [this] whose keys are not in [other]. - * - * Uses [this] partitioner/partition size, because even if [other] is huge, the resulting - * RDD will be less than or equal to us. - */ -fun JavaRDD>.subtractByKey(other: JavaRDD>): JavaRDD> = - toJavaPairRDD().subtractByKey(other.toJavaPairRDD()).toTupleRDD() - -/** - * Return an RDD with the pairs from [this] whose keys are not in [other]. - */ -fun JavaRDD>.subtractByKey( - other: JavaRDD>, - numPartitions: Int, -): JavaRDD> = toJavaPairRDD() - .subtractByKey(other.toJavaPairRDD(), numPartitions) - .toTupleRDD() - -/** - * Return an RDD with the pairs from [this] whose keys are not in [other]. - */ -fun JavaRDD>.subtractByKey( - other: JavaRDD>, - p: Partitioner, -): JavaRDD> = toJavaPairRDD() - .subtractByKey(other.toJavaPairRDD(), p) - .toTupleRDD() - -/** - * Return the list of values in the RDD for key [key]. This operation is done efficiently if the - * RDD has a known partitioner by only searching the partition that the key maps to. - */ -fun JavaRDD>.lookup(key: K): List = toJavaPairRDD().lookup(key) - -/** Output the RDD to any Hadoop-supported file system. */ -fun > JavaRDD>.saveAsHadoopFile( - path: String, - keyClass: Class<*>, - valueClass: Class<*>, - outputFormatClass: Class, - conf: JobConf, -): Unit = toJavaPairRDD().saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) - -/** Output the RDD to any Hadoop-supported file system. */ -fun > JavaRDD>.saveAsHadoopFile( - path: String, - keyClass: Class<*>, - valueClass: Class<*>, - outputFormatClass: Class, -): Unit = toJavaPairRDD().saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass) - -/** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */ -fun > JavaRDD>.saveAsHadoopFile( - path: String, - keyClass: Class<*>, - valueClass: Class<*>, - outputFormatClass: Class, - codec: Class, -): Unit = toJavaPairRDD().saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec) - -/** Output the RDD to any Hadoop-supported file system. */ -fun > JavaRDD>.saveAsNewAPIHadoopFile( - path: String, - keyClass: Class<*>, - valueClass: Class<*>, - outputFormatClass: Class, - conf: Configuration, -): Unit = toJavaPairRDD().saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) - -/** - * Output the RDD to any Hadoop-supported storage system, using - * a Configuration object for that storage system. - */ -fun JavaRDD>.saveAsNewAPIHadoopDataset(conf: Configuration): Unit = - toJavaPairRDD().saveAsNewAPIHadoopDataset(conf) - -/** Output the RDD to any Hadoop-supported file system. */ -fun > JavaRDD>.saveAsNewAPIHadoopFile( - path: String, - keyClass: Class<*>, - valueClass: Class<*>, - outputFormatClass: Class, -): Unit = toJavaPairRDD().saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass) - -/** - * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for - * that storage system. The JobConf should set an OutputFormat and any output paths required - * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop - * MapReduce job. - */ -fun JavaRDD>.saveAsHadoopDataset(conf: JobConf): Unit = - toJavaPairRDD().saveAsHadoopDataset(conf) - -/** - * Repartition the RDD according to the given partitioner and, within each resulting partition, - * sort records by their keys. - * - * This is more efficient than calling [JavaRDD.repartition] and then sorting within each partition - * because it can push the sorting down into the shuffle machinery. - */ -fun JavaRDD>.repartitionAndSortWithinPartitions(partitioner: Partitioner): JavaRDD> = - toJavaPairRDD().repartitionAndSortWithinPartitions(partitioner).toTupleRDD() - -/** - * Repartition the RDD according to the given partitioner and, within each resulting partition, - * sort records by their keys. - * - * This is more efficient than calling [JavaRDD.repartition] and then sorting within each partition - * because it can push the sorting down into the shuffle machinery. - */ -fun JavaRDD>.repartitionAndSortWithinPartitions( - partitioner: Partitioner, - comp: Comparator, -): JavaRDD> = toJavaPairRDD().repartitionAndSortWithinPartitions(partitioner, comp).toTupleRDD() - -/** - * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling - * [JavaRDD.collect] or `save` on the resulting RDD will return or output an ordered list of records - * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in - * order of the keys). - */ -fun JavaRDD>.sortByKey(ascending: Boolean = true): JavaRDD> = - toJavaPairRDD().sortByKey(ascending).toTupleRDD() - -/** - * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling - * [JavaRDD.collect] or `save` on the resulting RDD will return or output an ordered list of records - * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in - * order of the keys). - */ -fun JavaRDD>.sortByKey(ascending: Boolean, numPartitions: Int): JavaRDD> = - toJavaPairRDD().sortByKey(ascending, numPartitions).toTupleRDD() - -/** - * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling - * [JavaRDD.collect] or `save` on the resulting RDD will return or output an ordered list of records - * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in - * order of the keys). - */ -fun JavaRDD>.sortByKey(comp: Comparator, ascending: Boolean = true): JavaRDD> = - toJavaPairRDD().sortByKey(comp, ascending).toTupleRDD() - -/** - * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling - * [JavaRDD.collect] or `save` on the resulting RDD will return or output an ordered list of records - * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in - * order of the keys). - */ -fun JavaRDD>.sortByKey( - comp: Comparator, - ascending: Boolean, - numPartitions: Int, -): JavaRDD> = toJavaPairRDD().sortByKey(comp, ascending, numPartitions).toTupleRDD() - -//#if sparkMinor >= 3.1 -/** - * Return a RDD containing only the elements in the inclusive range [lower] to [upper]. - * If the RDD has been partitioned using a [RangePartitioner], then this operation can be - * performed efficiently by only scanning the partitions that might contain matching elements. - * Otherwise, a standard [filter] is applied to all partitions. - * - * @since 3.1.0 - */ -fun JavaRDD>.filterByRange(lower: K, upper: K): JavaRDD> = - toJavaPairRDD().filterByRange(lower, upper).toTupleRDD() - -/** - * Return a RDD containing only the elements in the range [range]. - * If the RDD has been partitioned using a [RangePartitioner], then this operation can be - * performed efficiently by only scanning the partitions that might contain matching elements. - * Otherwise, a standard [filter] is applied to all partitions. - * - * @since 3.1.0 - */ -fun , V> JavaRDD>.filterByRange(range: ClosedRange): JavaRDD> = - filterByRange(range.start, range.endInclusive) - -/** - * Return a RDD containing only the elements in the inclusive range [lower] to [upper]. - * If the RDD has been partitioned using a [RangePartitioner], then this operation can be - * performed efficiently by only scanning the partitions that might contain matching elements. - * Otherwise, a standard [filter] is applied to all partitions. - * - * @since 3.1.0 - */ -fun JavaRDD>.filterByRange( - comp: Comparator, - lower: K, - upper: K, -): JavaRDD> = toJavaPairRDD() - .filterByRange(comp, lower, upper) - .toTupleRDD() - -/** - * Return a RDD containing only the elements in the inclusive range [range]. - * If the RDD has been partitioned using a [RangePartitioner], then this operation can be - * performed efficiently by only scanning the partitions that might contain matching elements. - * Otherwise, a standard [filter] is applied to all partitions. - * - * @since 3.1.0 - */ -fun , V> JavaRDD>.filterByRange( - comp: Comparator, - range: ClosedRange, -): JavaRDD> = toJavaPairRDD() - .filterByRange(comp, range.start, range.endInclusive) - .toTupleRDD() -//#endif - -/** - * Return an RDD with the keys of each tuple. - */ -fun JavaRDD>.keys(): JavaRDD = toJavaPairRDD().keys() - -/** - * Return an RDD with the values of each tuple. - */ -fun JavaRDD>.values(): JavaRDD = toJavaPairRDD().values() - -/** - * Return approximate number of distinct values for each key in this RDD. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * here. - * - * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It must be greater than 0.000017. - * @param partitioner partitioner of the resulting RDD. - */ -fun JavaRDD>.countApproxDistinctByKey( - relativeSD: Double, - partitioner: Partitioner, -): JavaRDD> = toJavaPairRDD().countApproxDistinctByKey(relativeSD, partitioner).toTupleRDD() - -/** - * Return approximate number of distinct values for each key in this RDD. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * [here](https://doi.org/10.1145/2452376.2452456). - * - * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It must be greater than 0.000017. - * @param numPartitions number of partitions of the resulting RDD. - */ -fun JavaRDD>.countApproxDistinctByKey( - relativeSD: Double, - numPartitions: Int, -): JavaRDD> = toJavaPairRDD().countApproxDistinctByKey(relativeSD, numPartitions).toTupleRDD() - -/** - * Return approximate number of distinct values for each key in this RDD. - * - * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: - * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available - * [here](https://doi.org/10.1145/2452376.2452456). - * - * @param relativeSD Relative accuracy. Smaller values create counters that require more space. - * It must be greater than 0.000017. - */ -fun JavaRDD>.countApproxDistinctByKey(relativeSD: Double): JavaRDD> = - toJavaPairRDD().countApproxDistinctByKey(relativeSD).toTupleRDD() - diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt index 00655de0..f70f3f8b 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkSession.kt @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -26,39 +26,23 @@ package org.jetbrains.kotlinx.spark.api - import org.apache.hadoop.conf.Configuration -import org.apache.spark.SparkConf -import org.apache.spark.SparkContext -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.api.java.JavaRDDLike -import org.apache.spark.api.java.JavaSparkContext -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession.Builder import org.apache.spark.sql.UDFRegistration -import org.apache.spark.streaming.Duration -import org.apache.spark.streaming.Durations -import org.apache.spark.streaming.api.java.JavaStreamingContext -import org.jetbrains.kotlinx.spark.api.SparkLogLevel.ERROR import org.jetbrains.kotlinx.spark.api.tuples.* -import scala.reflect.ClassTag -import java.io.Serializable /** * This wrapper over [SparkSession] which provides several additional methods to create [org.apache.spark.sql.Dataset]. * * @param spark The current [SparkSession] to wrap */ -class KSparkSession(val spark: SparkSession) { - +class KSparkSession( + val spark: SparkSession, +) { val sparkVersion = /*$"\""+spark+"\""$*/ /*-*/ "nope" - /** Lazy instance of [JavaSparkContext] wrapper around [sparkContext]. */ - val sc: JavaSparkContext by lazy { JavaSparkContext(spark.sparkContext) } - /** Utility method to create dataset from list. */ inline fun List.toDS(): Dataset = toDS(spark) @@ -81,39 +65,10 @@ class KSparkSession(val spark: SparkSession) { inline fun dfOf(vararg arg: T): Dataset = spark.dfOf(*arg) /**Utility method to create dataframe from *array or vararg arguments with given column names */ - inline fun dfOf(colNames: Array, vararg arg: T): Dataset = spark.dfOf(colNames, *arg) - - /** Utility method to create dataset from Scala [RDD]. */ - inline fun RDD.toDS(): Dataset = toDS(spark) - - /** Utility method to create dataset from [JavaRDDLike]. */ - inline fun JavaRDDLike.toDS(): Dataset = toDS(spark) - - /** - * Utility method to create Dataset (Dataframe) from RDD. - * NOTE: [T] must be [Serializable]. - */ - inline fun RDD.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames) - - /** - * Utility method to create Dataset (Dataframe) from JavaRDD. - * NOTE: [T] must be [Serializable]. - */ - inline fun JavaRDDLike.toDF(vararg colNames: String): Dataset = toDF(spark, *colNames) - - /** - * Utility method to create an RDD from a list. - * NOTE: [T] must be [Serializable]. - */ - fun List.toRDD(numSlices: Int = sc.defaultParallelism()): JavaRDD = - sc.toRDD(this, numSlices) - - /** - * Utility method to create an RDD from a list. - * NOTE: [T] must be [Serializable]. - */ - fun rddOf(vararg elements: T, numSlices: Int = sc.defaultParallelism()): JavaRDD = - sc.toRDD(elements.asList(), numSlices) + inline fun dfOf( + colNames: Array, + vararg arg: T, + ): Dataset = spark.dfOf(colNames, *arg) /** * A collection of methods for registering user-defined functions (UDF). @@ -129,81 +84,14 @@ class KSparkSession(val spark: SparkSession) { */ val udf: UDFRegistration get() = spark.udf() - inline fun > - NAMED_UDF.register(): NAMED_UDF = + inline fun > NAMED_UDF.register(): NAMED_UDF = this@KSparkSession.udf.register(namedUdf = this) - inline fun > - UserDefinedFunction.register(name: String): NAMED_UDF = - this@KSparkSession.udf.register(name = name, udf = this) + inline fun > UserDefinedFunction.register( + name: String, + ): NAMED_UDF = this@KSparkSession.udf.register(name = name, udf = this) } -/** - * This wrapper over [SparkSession] and [JavaStreamingContext] provides several additional methods to create [org.apache.spark.sql.Dataset] - */ -class KSparkStreamingSession(@Transient val ssc: JavaStreamingContext) : Serializable { - // Serializable and Transient so that [withSpark] works inside [foreachRDD] and other Spark functions that serialize - - private var runAfterStart: KSparkStreamingSession.() -> Unit = {} - - /** [block] will be run after the streaming session has started from a new context (so not when loading from a checkpoint) - * and before it's terminated. */ - fun setRunAfterStart(block: KSparkStreamingSession.() -> Unit) { - runAfterStart = block - } - - fun invokeRunAfterStart(): Unit = runAfterStart() - - - /** Creates new spark session from given [sc]. */ - fun getSpark(sc: SparkConf): SparkSession = - SparkSession - .builder() - .config(sc) - .getOrCreate() - - /** Creates new spark session from context of given JavaRDD, [rddForConf]. */ - fun getSpark(rddForConf: JavaRDDLike<*, *>): SparkSession = getSpark(rddForConf.context().conf) - - /** Creates new spark session from context of given JavaStreamingContext, [sscForConf] */ - fun getSpark(sscForConf: JavaStreamingContext): SparkSession = getSpark(sscForConf.sparkContext().conf) - - /** - * Helper function to enter Spark scope from [sc] like - * ```kotlin - * withSpark(sc) { // this: KSparkSession - * - * } - * ``` - */ - fun withSpark(sc: SparkConf, func: KSparkSession.() -> T): T = - KSparkSession(getSpark(sc)).func() - - /** - * Helper function to enter Spark scope from a provided like - * when using the `foreachRDD` function. - * ```kotlin - * withSpark(rdd) { // this: KSparkSession - * - * } - * ``` - */ - fun withSpark(rddForConf: JavaRDDLike<*, *>, func: KSparkSession.() -> T): T = - KSparkSession(getSpark(rddForConf)).func() - - /** - * Helper function to enter Spark scope from [sscForConf] like - * ```kotlin - * withSpark(ssc) { // this: KSparkSession - * - * } - * ``` - */ - fun withSpark(sscForConf: JavaStreamingContext, func: KSparkSession.() -> T): T = - KSparkSession(getSpark(sscForConf)).func() -} - - /** * The entry point to programming Spark with the Dataset and DataFrame API. * @@ -211,23 +99,18 @@ class KSparkStreamingSession(@Transient val ssc: JavaStreamingContext) : Seriali */ typealias SparkSession = org.apache.spark.sql.SparkSession -/** - * Control our logLevel. This overrides any user-defined log settings. - * @param level The desired log level as [SparkLogLevel]. - */ -fun SparkContext.setLogLevel(level: SparkLogLevel): Unit = setLogLevel(level.name) - /** Log levels for spark. */ enum class SparkLogLevel { - ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN + ALL, + DEBUG, + ERROR, + FATAL, + INFO, + OFF, + TRACE, + WARN, } -/** - * Returns the Spark context associated with this Spark session. - */ -val SparkSession.sparkContext: SparkContext - get() = sparkContext() - /** * Wrapper for spark creation which allows setting different spark params. * @@ -241,30 +124,34 @@ val SparkSession.sparkContext: SparkContext * @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession]) */ @JvmOverloads -inline fun withSpark( +inline fun withSparkConnect( + remote: String = "sc://localhost", props: Map = emptyMap(), - master: String = SparkConf().get("spark.master", "local[*]"), - appName: String = "Kotlin Spark Sample", - logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit, ) { - val builder = SparkSession - .builder() - .master(master) - .appName(appName) - .apply { - props.forEach { - when (val value = it.value) { - is String -> config(it.key, value) - is Boolean -> config(it.key, value) - is Long -> config(it.key, value) - is Double -> config(it.key, value) - else -> throw IllegalArgumentException("Cannot set property ${it.key} because value $value of unsupported type ${value::class}") + val defaultArgs = mapOf( + "spark.sql.legacy.allowUntypedScalaUDF" to true, + ) + + val builder = + SparkSession + .builder() + .remote(remote) + .apply { + (defaultArgs + props).forEach { + when (val value = it.value) { + is String -> config(it.key, value) + is Boolean -> config(it.key, value) + is Long -> config(it.key, value) + is Double -> config(it.key, value) + else -> + throw IllegalArgumentException( + "Cannot set property ${it.key} because value $value of unsupported type ${value::class}", + ) + } } } - } - withSpark(builder, logLevel, func) - + withSparkConnect(builder, func) } /** @@ -274,117 +161,20 @@ inline fun withSpark( * @param logLevel Control our logLevel. This overrides any user-defined log settings. * @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession]) */ - -@JvmOverloads -inline fun withSpark(builder: Builder, logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit) { +inline fun withSparkConnect( + builder: Builder, + func: KSparkSession.() -> Unit, +) { builder .getOrCreate() .apply { KSparkSession(this).apply { - sparkContext.setLogLevel(logLevel) func() spark.stop() } } } -/** - * Wrapper for spark creation which copies params from [sparkConf]. - * - * @param sparkConf Sets a list of config options based on this. - * @param logLevel Control our logLevel. This overrides any user-defined log settings. - * @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession]) - */ -@JvmOverloads -inline fun withSpark(sparkConf: SparkConf, logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit) { - withSpark( - builder = SparkSession.builder().config(sparkConf), - logLevel = logLevel, - func = func, - ) -} - - -/** - * Wrapper for spark streaming creation. `spark: SparkSession` and `ssc: JavaStreamingContext` are provided, started, - * awaited, and stopped automatically. - * The use of a checkpoint directory is optional. - * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the provided factory - * will be used to create a JavaStreamingContext. - * - * @param batchDuration The time interval at which streaming data will be divided into batches. Defaults to 1 - * second. - * @param checkpointPath If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist (or `null` is provided), - * then the streaming context will be built using the other provided parameters. - * @param hadoopConf Only used if [checkpointPath] is given. Hadoop configuration if necessary for reading from - * any HDFS compatible file system. - * @param createOnError Only used if [checkpointPath] is given. Whether to create a new JavaStreamingContext if - * there is an error in reading checkpoint data. - * @param props Spark options, value types are runtime-checked for type-correctness. - * @param master Sets the Spark master URL to connect to, such as "local" to run locally, "local[4]" to - * run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster. - * By default, it tries to get the system value "spark.master", otherwise it uses "local[*]". - * @param appName Sets a name for the application, which will be shown in the Spark web UI. - * If no application name is set, a randomly generated name will be used. - * @param timeout The time in milliseconds to wait for the stream to terminate without input. -1 by default, - * this means no timeout. - * @param startStreamingContext Defaults to `true`. If set to `false`, then the streaming context will not be started. - * @param func Function which will be executed in context of [KSparkStreamingSession] (it means that - * `this` inside block will point to [KSparkStreamingSession]) - */ -@JvmOverloads -fun withSparkStreaming( - batchDuration: Duration = Durations.seconds(1L), - checkpointPath: String? = null, - hadoopConf: Configuration = getDefaultHadoopConf(), - createOnError: Boolean = false, - props: Map = emptyMap(), - master: String = SparkConf().get("spark.master", "local[*]"), - appName: String = "Kotlin Spark Sample", - timeout: Long = -1L, - startStreamingContext: Boolean = true, - func: KSparkStreamingSession.() -> Unit, -) { - - // will only be set when a new context is created - var kSparkStreamingSession: KSparkStreamingSession? = null - - val creatingFunc = { - val sc = SparkConf() - .setAppName(appName) - .setMaster(master) - .setAll( - props - .map { (key, value) -> key X value.toString() } - .asScalaIterable() - ) - - val ssc = JavaStreamingContext(sc, batchDuration) - ssc.checkpoint(checkpointPath) - - kSparkStreamingSession = KSparkStreamingSession(ssc) - func(kSparkStreamingSession!!) - - ssc - } - - val ssc = when { - checkpointPath != null -> - JavaStreamingContext.getOrCreate(checkpointPath, creatingFunc, hadoopConf, createOnError) - - else -> creatingFunc() - } - - if (startStreamingContext) { - ssc.start() - kSparkStreamingSession?.invokeRunAfterStart() - } - ssc.awaitTerminationOrTimeout(timeout) - ssc.stop() -} - // calling org.apache.spark.deploy.`SparkHadoopUtil$`.`MODULE$`.get().conf() private fun getDefaultHadoopConf(): Configuration { val klass = Class.forName("org.apache.spark.deploy.SparkHadoopUtil$") @@ -397,37 +187,3 @@ private fun getDefaultHadoopConf(): Configuration { return conf } - -/** - * Broadcast a read-only variable to the cluster, returning a - * [org.apache.spark.broadcast.Broadcast] object for reading it in distributed functions. - * The variable will be sent to each cluster only once. - * - * @param value value to broadcast to the Spark nodes - * @return `Broadcast` object, a read-only variable cached on each machine - */ -inline fun SparkSession.broadcast(value: T): Broadcast = try { - sparkContext.broadcast(value, ClassTag.apply(T::class.java)) -} catch (e: ClassNotFoundException) { - JavaSparkContext(sparkContext).broadcast(value) -} - -/** - * Broadcast a read-only variable to the cluster, returning a - * [org.apache.spark.broadcast.Broadcast] object for reading it in distributed functions. - * The variable will be sent to each cluster only once. - * - * @param value value to broadcast to the Spark nodes - * @return `Broadcast` object, a read-only variable cached on each machine - * @see broadcast - */ -@Deprecated( - "You can now use `spark.broadcast()` instead.", - ReplaceWith("spark.broadcast(value)"), - DeprecationLevel.WARNING -) -inline fun SparkContext.broadcast(value: T): Broadcast = try { - broadcast(value, ClassTag.apply(T::class.java)) -} catch (e: ClassNotFoundException) { - JavaSparkContext(this).broadcast(value) -} \ No newline at end of file diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/StreamingKeyValues.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/StreamingKeyValues.kt deleted file mode 100644 index 19b4ea79..00000000 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/StreamingKeyValues.kt +++ /dev/null @@ -1,668 +0,0 @@ -/*- - * =LICENSE= - * Kotlin Spark API: API for Spark 3.2+ (Scala 2.12) - * ---------- - * Copyright (C) 2019 - 2022 JetBrains - * ---------- - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * =LICENSEEND= - */ -@file:Suppress("unused") - -package org.jetbrains.kotlinx.spark.api - -import org.apache.spark.HashPartitioner -import org.apache.spark.Partitioner -import org.apache.spark.api.java.JavaRDD -import org.apache.spark.api.java.Optional -import org.apache.spark.streaming.Duration -import org.apache.spark.streaming.StateSpec -import org.apache.spark.streaming.api.java.JavaDStream -import org.apache.spark.streaming.api.java.JavaMapWithStateDStream -import org.apache.spark.streaming.api.java.JavaPairDStream -import scala.Tuple2 - - -fun JavaDStream>.toPairDStream(): JavaPairDStream = - JavaPairDStream.fromJavaDStream(this) - -fun JavaPairDStream.toTupleDStream(): JavaDStream> = - toJavaDStream() - -/** - * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to - * generate the RDDs with `numPartitions` partitions. - */ -fun JavaDStream>.groupByKey( - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), -): JavaDStream>> = - toPairDStream() - .groupByKey(numPartitions) - .toTupleDStream() - -/** - * Return a new DStream by applying `groupByKey` on each RDD. The supplied - * org.apache.spark.Partitioner is used to control the partitioning of each RDD. - */ -fun JavaDStream>.groupByKey(partitioner: Partitioner): JavaDStream>> = - toPairDStream() - .groupByKey(partitioner) - .toTupleDStream() - -/** - * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs - * with `numPartitions` partitions. - */ -fun JavaDStream>.reduceByKey( - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), - reduceFunc: (V, V) -> V, -): JavaDStream> = - toPairDStream() - .reduceByKey(reduceFunc, numPartitions) - .toTupleDStream() - -/** - * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are - * merged using the supplied reduce function. org.apache.spark.Partitioner is used to control - * the partitioning of each RDD. - */ -fun JavaDStream>.reduceByKey( - partitioner: Partitioner, - reduceFunc: (V, V) -> V, -): JavaDStream> = - toPairDStream() - .reduceByKey(reduceFunc, partitioner) - .toTupleDStream() - -/** - * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in - * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information. - */ -fun JavaDStream>.combineByKey( - createCombiner: (V) -> C, - mergeValue: (C, V) -> C, - mergeCombiner: (C, C) -> C, - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), - mapSideCombine: Boolean = true, -): JavaDStream> = - toPairDStream() - .combineByKey(createCombiner, mergeValue, mergeCombiner, HashPartitioner(numPartitions), mapSideCombine) - .toTupleDStream() - -/** - * Combine elements of each key in DStream's RDDs using custom functions. This is similar to the - * combineByKey for RDDs. Please refer to combineByKey in - * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information. - */ -fun JavaDStream>.combineByKey( - createCombiner: (V) -> C, - mergeValue: (C, V) -> C, - mergeCombiner: (C, C) -> C, - partitioner: Partitioner, - mapSideCombine: Boolean = true, -): JavaDStream> = - toPairDStream() - .combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine) - .toTupleDStream() - -/** - * Return a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * Similar to `DStream.groupByKey()`, but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions number of partitions of each RDD in the new DStream; if not specified - * then Spark's default number of partitions will be used - */ -fun JavaDStream>.groupByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration = dstream().slideDuration(), - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), -): JavaDStream>> = - toPairDStream() - .groupByKeyAndWindow(windowDuration, slideDuration, numPartitions) - .toTupleDStream() - -/** - * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * Similar to `DStream.groupByKey()`, but applies it over a sliding window. - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new - * DStream. - */ -fun JavaDStream>.groupByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration = dstream().slideDuration(), - partitioner: Partitioner, -): JavaDStream>> = - toPairDStream() - .groupByKeyAndWindow(windowDuration, slideDuration, partitioner) - .toTupleDStream() - -/** - * Return a new DStream by applying `reduceByKey` over a sliding window. This is similar to - * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to - * generate the RDDs with `numPartitions` partitions. - * @param reduceFunc associative and commutative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param numPartitions number of partitions of each RDD in the new DStream. - */ -fun JavaDStream>.reduceByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration = dstream().slideDuration(), - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), - reduceFunc: (V, V) -> V, -): JavaDStream> = - toPairDStream() - .reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions) - .toTupleDStream() - -/** - * Return a new DStream by applying `reduceByKey` over a sliding window. Similar to - * `DStream.reduceByKey()`, but applies it over a sliding window. - * @param reduceFunc associative and commutative reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD - * in the new DStream. - */ -fun JavaDStream>.reduceByKeyAndWindow( - windowDuration: Duration, - slideDuration: Duration = dstream().slideDuration(), - partitioner: Partitioner, - reduceFunc: (V, V) -> V, -): JavaDStream> = - toPairDStream() - .reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) - .toTupleDStream() - -/** - * Return a new DStream by applying incremental `reduceByKey` over a sliding window. - * The reduced value of over a new window is calculated using the old window's reduced value : - * 1. reduce the new values that entered the window (e.g., adding new counts) - * - * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * - * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. - * However, it is applicable to only "invertible reduce functions". - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * @param reduceFunc associative and commutative reduce function - * @param invReduceFunc inverse reduce function; such that for all y, invertible x: - * `invReduceFunc(reduceFunc(x, y), x) = y` - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param filterFunc Optional function to filter expired key-value pairs; - * only pairs that satisfy the function are retained - */ -fun JavaDStream>.reduceByKeyAndWindow( - invReduceFunc: (V, V) -> V, - windowDuration: Duration, - slideDuration: Duration = dstream().slideDuration(), - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), - filterFunc: ((Tuple2) -> Boolean)? = null, - reduceFunc: (V, V) -> V, -): JavaDStream> = - toPairDStream() - .reduceByKeyAndWindow( - /* reduceFunc = */ reduceFunc, - /* invReduceFunc = */ invReduceFunc, - /* windowDuration = */ windowDuration, - /* slideDuration = */ slideDuration, - /* numPartitions = */ numPartitions, - /* filterFunc = */ filterFunc?.let { - { tuple: Tuple2 -> - filterFunc(tuple) - } - } - ) - .toTupleDStream() - -/** - * Return a new DStream by applying incremental `reduceByKey` over a sliding window. - * The reduced value of over a new window is calculated using the old window's reduced value : - * 1. reduce the new values that entered the window (e.g., adding new counts) - * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) - * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function. - * However, it is applicable to only "invertible reduce functions". - * @param reduceFunc associative and commutative reduce function - * @param invReduceFunc inverse reduce function - * @param windowDuration width of the window; must be a multiple of this DStream's - * batching interval - * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's batching interval - * @param partitioner partitioner for controlling the partitioning of each RDD in the new - * DStream. - * @param filterFunc Optional function to filter expired key-value pairs; - * only pairs that satisfy the function are retained - */ -fun JavaDStream>.reduceByKeyAndWindow( - invReduceFunc: (V, V) -> V, - windowDuration: Duration, - slideDuration: Duration = dstream().slideDuration(), - partitioner: Partitioner, - filterFunc: ((Tuple2) -> Boolean)? = null, - reduceFunc: (V, V) -> V, -): JavaDStream> = - toPairDStream() - .reduceByKeyAndWindow( - /* reduceFunc = */ reduceFunc, - /* invReduceFunc = */ invReduceFunc, - /* windowDuration = */ windowDuration, - /* slideDuration = */ slideDuration, - /* partitioner = */ partitioner, - /* filterFunc = */ filterFunc?.let { - { tuple: Tuple2 -> - filterFunc(tuple) - } - } - ) - .toTupleDStream() - -/** - * Return a [JavaMapWithStateDStream] by applying a function to every key-value element of - * `this` stream, while maintaining some state data for each unique key. The mapping function - * and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this - * transformation can be specified using `StateSpec` class. The state data is accessible in - * as a parameter of type `State` in the mapping function. - * - * Example of using `mapWithState`: - * ```kotlin - * // A mapping function that maintains an integer state and return a String - * fun mappingFunction(key: String, value: Optional, state: State): Optional { - * // Use state.exists(), state.get(), state.update() and state.remove() - * // to manage state, and return the necessary string - * } - * - * val spec = StateSpec.function(::mappingFunction).numPartitions(10) - * - * val mapWithStateDStream = keyValueDStream.mapWithState(spec) - * ``` - * - * @param spec Specification of this transformation - * @tparam StateType Class type of the state data - * @tparam MappedType Class type of the mapped data - */ -fun JavaDStream>.mapWithState( - spec: StateSpec, -): JavaMapWithStateDStream = - toPairDStream().mapWithState(spec) - -/** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * In every batch the updateFunc will be called for each state even if there are no new values. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * Note: Needs checkpoint directory to be set. - * @param updateFunc State update function. If `this` function returns `null`, then - * corresponding state key-value pair will be eliminated. - * @tparam S State type - */ -@JvmName("updateStateByKeyNullable") -fun JavaDStream>.updateStateByKey( - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), - updateFunc: (List, S?) -> S?, -): JavaDStream> = - toPairDStream() - .updateStateByKey( - { list: List, s: Optional -> - updateFunc(list, s.getOrNull()).toOptional() - }, - numPartitions, - ) - .toTupleDStream() - -/** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * In every batch the updateFunc will be called for each state even if there are no new values. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. - * Note: Needs checkpoint directory to be set. - * @param updateFunc State update function. If `this` function returns `null`, then - * corresponding state key-value pair will be eliminated. - * @tparam S State type - */ -@JvmName("updateStateByKey") -fun JavaDStream>.updateStateByKey( - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), - updateFunc: (List, Optional) -> Optional, -): JavaDStream> = - toPairDStream() - .updateStateByKey( - updateFunc, - numPartitions, - ) - .toTupleDStream() - -/** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * In every batch the updateFunc will be called for each state even if there are no new values. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. - * Note: Needs checkpoint directory to be set. - * @param updateFunc State update function. Note, that this function may generate a different - * tuple with a different key than the input key. Therefore keys may be removed - * or added in this way. It is up to the developer to decide whether to - * remember the partitioner despite the key being changed. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new - * DStream - * @tparam S State type - */ -@JvmName("updateStateByKeyNullable") -fun JavaDStream>.updateStateByKey( - partitioner: Partitioner, - updateFunc: (List, S?) -> S?, -): JavaDStream> = - toPairDStream() - .updateStateByKey( - { list: List, s: Optional -> - updateFunc(list, s.getOrNull()).toOptional() - }, - partitioner, - ) - .toTupleDStream() - -/** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of each key. - * In every batch the updateFunc will be called for each state even if there are no new values. - * [[org.apache.spark.Partitioner]] is used to control the partitioning of each RDD. - * Note: Needs checkpoint directory to be set. - * @param updateFunc State update function. Note, that this function may generate a different - * tuple with a different key than the input key. Therefore keys may be removed - * or added in this way. It is up to the developer to decide whether to - * remember the partitioner despite the key being changed. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new - * DStream - * @tparam S State type - */ -fun JavaDStream>.updateStateByKey( - partitioner: Partitioner, - updateFunc: (List, Optional) -> Optional, -): JavaDStream> = - toPairDStream() - .updateStateByKey( - updateFunc, - partitioner, - ) - .toTupleDStream() - -/** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key. - * org.apache.spark.Partitioner is used to control the partitioning of each RDD. - * Note: Needs checkpoint directory to be set. - * @param updateFunc State update function. If `this` function returns `null`, then - * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new - * DStream. - * @param initialRDD initial state value of each key. - * @tparam S State type - */ -@JvmName("updateStateByKeyNullable") -fun JavaDStream>.updateStateByKey( - partitioner: Partitioner, - initialRDD: JavaRDD>, - updateFunc: (List, S?) -> S?, -): JavaDStream> = - toPairDStream() - .updateStateByKey( - { list: List, s: Optional -> - updateFunc(list, s.getOrNull()).toOptional() - }, - partitioner, - initialRDD.toJavaPairRDD(), - ) - .toTupleDStream() - -/** - * Return a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key. - * org.apache.spark.Partitioner is used to control the partitioning of each RDD. - * Note: Needs checkpoint directory to be set. - * @param updateFunc State update function. If `this` function returns `null`, then - * corresponding state key-value pair will be eliminated. - * @param partitioner Partitioner for controlling the partitioning of each RDD in the new - * DStream. - * @param initialRDD initial state value of each key. - * @tparam S State type - */ -fun JavaDStream>.updateStateByKey( - partitioner: Partitioner, - initialRDD: JavaRDD>, - updateFunc: (List, Optional) -> Optional, -): JavaDStream> = - toPairDStream() - .updateStateByKey( - updateFunc, - partitioner, - initialRDD.toJavaPairRDD(), - ) - .toTupleDStream() - - -/** - * Return a new DStream by applying a map function to the value of each key-value pairs in - * 'this' DStream without changing the key. - */ -fun JavaDStream>.mapValues( - mapValuesFunc: (V) -> U, -): JavaDStream> = - toPairDStream() - .mapValues(mapValuesFunc) - .toTupleDStream() - -/** - * Return a new DStream by applying a flatmap function to the value of each key-value pairs in - * 'this' DStream without changing the key. - */ -fun JavaDStream>.flatMapValues( - flatMapValuesFunc: (V) -> Iterator, -): JavaDStream> = - toPairDStream() - .flatMapValues(flatMapValuesFunc) - .toTupleDStream() - -/** - * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - */ -fun JavaDStream>.cogroup( - other: JavaDStream>, - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), -): JavaDStream, Iterable>>> = - toPairDStream() - .cogroup( - other.toPairDStream(), - numPartitions, - ) - .toTupleDStream() - - -/** - * Return a new DStream by applying 'cogroup' between RDDs of `this` DStream and `other` DStream. - * The supplied org.apache.spark.Partitioner is used to partition the generated RDDs. - */ -fun JavaDStream>.cogroup( - other: JavaDStream>, - partitioner: Partitioner, -): JavaDStream, Iterable>>> = - toPairDStream() - .cogroup( - other.toPairDStream(), - partitioner, - ) - .toTupleDStream() - -/** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. - */ -fun JavaDStream>.join( - other: JavaDStream>, - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), -): JavaDStream>> = - toPairDStream() - .join( - other.toPairDStream(), - numPartitions, - ) - .toTupleDStream() - -/** - * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream. - * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD. - */ -fun JavaDStream>.join( - other: JavaDStream>, - partitioner: Partitioner, -): JavaDStream>> = - toPairDStream() - .join( - other.toPairDStream(), - partitioner, - ) - .toTupleDStream() - -/** - * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - * partitions. - */ -fun JavaDStream>.leftOuterJoin( - other: JavaDStream>, - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), -): JavaDStream>>> = - toPairDStream() - .leftOuterJoin( - other.toPairDStream(), - numPartitions, - ) - .toTupleDStream() - -/** - * Return a new DStream by applying 'left outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied org.apache.spark.Partitioner is used to control - * the partitioning of each RDD. - */ -fun JavaDStream>.leftOuterJoin( - other: JavaDStream>, - partitioner: Partitioner, -): JavaDStream>>> = - toPairDStream() - .leftOuterJoin( - other.toPairDStream(), - partitioner, - ) - .toTupleDStream() - -/** - * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - * partitions. - */ -fun JavaDStream>.rightOuterJoin( - other: JavaDStream>, - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), -): JavaDStream, W>>> = - toPairDStream() - .rightOuterJoin( - other.toPairDStream(), - numPartitions, - ) - .toTupleDStream() - -/** - * Return a new DStream by applying 'right outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied org.apache.spark.Partitioner is used to control - * the partitioning of each RDD. - */ -fun JavaDStream>.rightOuterJoin( - other: JavaDStream>, - partitioner: Partitioner, -): JavaDStream, W>>> = - toPairDStream() - .rightOuterJoin( - other.toPairDStream(), - partitioner, - ) - .toTupleDStream() - -/** - * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and - * `other` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` - * partitions. - */ -fun JavaDStream>.fullOuterJoin( - other: JavaDStream>, - numPartitions: Int = dstream().ssc().sc().defaultParallelism(), -): JavaDStream, Optional>>> = - toPairDStream() - .fullOuterJoin( - other.toPairDStream(), - numPartitions, - ) - .toTupleDStream() - -/** - * Return a new DStream by applying 'full outer join' between RDDs of `this` DStream and - * `other` DStream. The supplied org.apache.spark.Partitioner is used to control - * the partitioning of each RDD. - */ -fun JavaDStream>.fullOuterJoin( - other: JavaDStream>, - partitioner: Partitioner, -): JavaDStream, Optional>>> = - toPairDStream() - .fullOuterJoin( - other.toPairDStream(), - partitioner, - ) - .toTupleDStream() - -/** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is - * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". - */ -fun JavaDStream>.saveAsHadoopFiles( - prefix: String, - suffix: String, -): Unit = toPairDStream().saveAsHadoopFiles(prefix, suffix) - -/** - * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is - * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". - */ -fun JavaDStream>.saveAsNewAPIHadoopFiles( - prefix: String, - suffix: String, -): Unit = toPairDStream().saveAsNewAPIHadoopFiles(prefix, suffix) diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/UserDefinedAggregateFunction.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/UserDefinedAggregateFunction.kt deleted file mode 100644 index 11e14c5f..00000000 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/UserDefinedAggregateFunction.kt +++ /dev/null @@ -1,287 +0,0 @@ -/*- - * =LICENSE= - * Kotlin Spark API: API for Spark 3.2+ (Scala 2.12) - * ---------- - * Copyright (C) 2019 - 2022 JetBrains - * ---------- - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * =LICENSEEND= - */ -package org.jetbrains.kotlinx.spark.api - -import org.apache.spark.sql.Encoder -import org.apache.spark.sql.UDFRegistration -import org.apache.spark.sql.expressions.Aggregator -import org.apache.spark.sql.functions -import java.io.Serializable -import kotlin.reflect.typeOf - -/** Creates an [Aggregator] in functional manner. - * - * @param zero A zero value for this aggregation. Should satisfy the property that any b + zero = b. - * @param reduce Combine two values to produce a new value. For performance, the function may modify `b` and - * return it instead of constructing new object for b. - * @param merge Merge two intermediate values. - * @param finish Transform the output of the reduction. - * @param bufferEncoder Optional. Specifies the `Encoder` for the intermediate value type. - * @param outputEncoder Optional. Specifies the `Encoder` for the final output value type. - * */ -inline fun aggregatorOf( - noinline zero: () -> BUF, - noinline reduce: (b: BUF, a: IN) -> BUF, - noinline merge: (b1: BUF, b2: BUF) -> BUF, - noinline finish: (reduction: BUF) -> OUT, - bufferEncoder: Encoder = kotlinEncoderFor(), - outputEncoder: Encoder = kotlinEncoderFor(), -): Aggregator = Aggregator(zero, reduce, merge, finish, bufferEncoder, outputEncoder) - -class Aggregator( - zero: () -> BUF, - reduce: (b: BUF, a: IN) -> BUF, - merge: (b1: BUF, b2: BUF) -> BUF, - finish: (reduction: BUF) -> OUT, - private val bufferEncoder: Encoder, - private val outputEncoder: Encoder, -) : Aggregator(), Serializable { - - private val _zero: () -> BUF = zero - private val _reduce: (b: BUF, a: IN) -> BUF = reduce - private val _merge: (b1: BUF, b2: BUF) -> BUF = merge - private val _finish: (reduction: BUF) -> OUT = finish - - override fun zero(): BUF = _zero() - override fun reduce(b: BUF, a: IN): BUF = _reduce(b, a) - override fun merge(b1: BUF, b2: BUF): BUF = _merge(b1, b2) - override fun finish(reduction: BUF): OUT = _finish(reduction) - override fun bufferEncoder(): Encoder = bufferEncoder - override fun outputEncoder(): Encoder = outputEncoder -} - - -/** - * Obtains a [NamedUserDefinedFunction1] that wraps the given [agg] so that it may be used with Data Frames. - * @see functions.udaf - * - * @param name Optional. Tries to obtain name from the class of [agg] if not supplied. - * Use [udafUnnamed] if no name is wanted. - * @param agg the given [Aggregator] to convert into a UDAF. Can also be created using [aggregatorOf]. - * @param nondeterministic Optional. If true, sets the UserDefinedFunction as nondeterministic. - * - * @return a [NamedUserDefinedFunction1] that can be used as an aggregating expression - * - * @see udaf for a named variant. - */ -inline fun > udaf( - agg: AGG, - nondeterministic: Boolean = false, -): NamedUserDefinedFunction1 = udaf( - name = agg::class.simpleName - ?: error("Could not obtain name from [agg], either supply a name or use [udafUnnamed()]"), - agg = agg, - nondeterministic = nondeterministic, -) - -/** - * Obtains a [NamedUserDefinedFunction1] that wraps the given [agg] so that it may be used with Data Frames. - * @see functions.udaf - * - * @param name Optional. Tries to obtain name from the class of [agg] if not supplied. - * Use [udafUnnamed] if no name is wanted. - * @param agg the given [Aggregator] to convert into a UDAF. Can also be created using [aggregatorOf]. - * @param nondeterministic Optional. If true, sets the UserDefinedFunction as nondeterministic. - * - * @return a [NamedUserDefinedFunction1] that can be used as an aggregating expression - * - * @see udaf for a named variant. - */ -inline fun > udaf( - name: String, - agg: AGG, - nondeterministic: Boolean = false, -): NamedUserDefinedFunction1 = - udafUnnamed(agg = agg, nondeterministic = nondeterministic).withName(name) - -/** - * Obtains a [UserDefinedFunction1] that wraps the given [agg] so that it may be used with Data Frames. - * @see functions.udaf - * - * @param agg the given [Aggregator] to convert into a UDAF. Can also be created using [aggregatorOf]. - * @param nondeterministic Optional. If true, sets the UserDefinedFunction as nondeterministic. - * - * @return a [UserDefinedFunction1] that can be used as an aggregating expression - * - * @see udaf for a named variant. - */ -inline fun > udafUnnamed( - agg: AGG, - nondeterministic: Boolean = false, -): UserDefinedFunction1 { - IN::class.checkForValidType("IN") - - return UserDefinedFunction1( - udf = functions.udaf(agg, kotlinEncoderFor()) - .let { if (nondeterministic) it.asNondeterministic() else it } - .let { if (typeOf().isMarkedNullable) it else it.asNonNullable() }, - encoder = kotlinEncoderFor(), - ) -} - -/** - * Obtains a [UserDefinedFunction1] created from an [Aggregator] created by the given arguments - * so that it may be used with Data Frames. - * @see functions.udaf - * @see aggregatorOf - * - * @param zero A zero value for this aggregation. Should satisfy the property that any b + zero = b. - * @param reduce Combine two values to produce a new value. For performance, the function may modify `b` and - * return it instead of constructing new object for b. - * @param merge Merge two intermediate values. - * @param finish Transform the output of the reduction. - * @param bufferEncoder Optional. Specifies the `Encoder` for the intermediate value type. - * @param outputEncoder Optional. Specifies the `Encoder` for the final output value type. - * @param nondeterministic Optional. If true, sets the UserDefinedFunction as nondeterministic. - * - * @return a [UserDefinedFunction1] that can be used as an aggregating expression - * - * @see udaf for a named variant. - */ -inline fun udaf( - noinline zero: () -> BUF, - noinline reduce: (b: BUF, a: IN) -> BUF, - noinline merge: (b1: BUF, b2: BUF) -> BUF, - noinline finish: (reduction: BUF) -> OUT, - bufferEncoder: Encoder = kotlinEncoderFor(), - outputEncoder: Encoder = kotlinEncoderFor(), - nondeterministic: Boolean = false, -): UserDefinedFunction1 = udafUnnamed( - aggregatorOf( - zero = zero, - reduce = reduce, - merge = merge, - finish = finish, - bufferEncoder = bufferEncoder, - outputEncoder = outputEncoder, - ), - nondeterministic = nondeterministic, -) - - -/** - * Obtains a [NamedUserDefinedFunction1] that wraps the given [agg] so that it may be used with Data Frames. - * so that it may be used with Data Frames. - * @see functions.udaf - * @see aggregatorOf - * - * @param name Optional. Name for the UDAF. - * @param zero A zero value for this aggregation. Should satisfy the property that any b + zero = b. - * @param reduce Combine two values to produce a new value. For performance, the function may modify `b` and - * return it instead of constructing new object for b. - * @param merge Merge two intermediate values. - * @param finish Transform the output of the reduction. - * @param bufferEncoder Optional. Specifies the `Encoder` for the intermediate value type. - * @param outputEncoder Optional. Specifies the `Encoder` for the final output value type. - * @param nondeterministic Optional. If true, sets the UserDefinedFunction as nondeterministic. - * - * @return a [UserDefinedFunction1] that can be used as an aggregating expression - * - * @see udafUnnamed for an unnamed variant. - */ -inline fun udaf( - name: String, - noinline zero: () -> BUF, - noinline reduce: (b: BUF, a: IN) -> BUF, - noinline merge: (b1: BUF, b2: BUF) -> BUF, - noinline finish: (reduction: BUF) -> OUT, - bufferEncoder: Encoder = kotlinEncoderFor(), - outputEncoder: Encoder = kotlinEncoderFor(), - nondeterministic: Boolean = false, -): NamedUserDefinedFunction1 = udaf( - name = name, - agg = aggregatorOf( - zero = zero, - reduce = reduce, - merge = merge, - finish = finish, - bufferEncoder = bufferEncoder, - outputEncoder = outputEncoder, - ), - nondeterministic = nondeterministic, -) - -/** - * Registers [agg] as a UDAF for SQL. Returns the UDAF as [NamedUserDefinedFunction]. - * Obtains a [NamedUserDefinedFunction1] that wraps the given [agg] so that it may be used with Data Frames. - * @see UDFRegistration.register - * @see functions.udaf - * - * @param agg the given [Aggregator] to convert into a UDAF. Can also be created using [aggregatorOf]. - * @param name Optional. Tries to obtain name from the class of [agg] if not supplied. - * Use [udafUnnamed] if no name is wanted. - * @param nondeterministic Optional. If true, sets the UserDefinedFunction as nondeterministic. - * - * @return a [NamedUserDefinedFunction1] that can be used as an aggregating expression - */ -inline fun UDFRegistration.register( - name: String, - agg: Aggregator, - nondeterministic: Boolean = false, -): NamedUserDefinedFunction1 = register(udaf(name, agg, nondeterministic)) - -/** - * Registers [agg] as a UDAF for SQL. Returns the UDAF as [NamedUserDefinedFunction]. - * Obtains a [NamedUserDefinedFunction1] that wraps the given [agg] so that it may be used with Data Frames. - * @see UDFRegistration.register - * @see functions.udaf - * - * @param agg the given [Aggregator] to convert into a UDAF. Can also be created using [aggregatorOf]. - * @param name Optional. Tries to obtain name from the class of [agg] if not supplied. - * Use [udafUnnamed] if no name is wanted. - * @param nondeterministic Optional. If true, sets the UserDefinedFunction as nondeterministic. - * - * @return a [NamedUserDefinedFunction1] that can be used as an aggregating expression - */ -inline fun UDFRegistration.register( - agg: Aggregator, - nondeterministic: Boolean = false, -): NamedUserDefinedFunction1 = register(udaf(agg, nondeterministic)) - -/** - * Registers a UDAF for SQL based on the given arguments. Returns the UDAF as [NamedUserDefinedFunction]. - * Obtains a [NamedUserDefinedFunction1] that wraps the given [agg] so that it may be used with Data Frames. - * @see UDFRegistration.register - * @see functions.udaf - * - * @param name Optional. Name for the UDAF. - * @param zero A zero value for this aggregation. Should satisfy the property that any b + zero = b. - * @param reduce Combine two values to produce a new value. For performance, the function may modify `b` and - * return it instead of constructing new object for b. - * @param merge Merge two intermediate values. - * @param finish Transform the output of the reduction. - * @param bufferEncoder Optional. Specifies the `Encoder` for the intermediate value type. - * @param outputEncoder Optional. Specifies the `Encoder` for the final output value type. - * @param nondeterministic Optional. If true, sets the UserDefinedFunction as nondeterministic. - * - * @return a [NamedUserDefinedFunction1] that can be used as an aggregating expression. - */ -inline fun UDFRegistration.register( - name: String, - noinline zero: () -> BUF, - noinline reduce: (b: BUF, a: IN) -> BUF, - noinline merge: (b1: BUF, b2: BUF) -> BUF, - noinline finish: (reduction: BUF) -> OUT, - bufferEncoder: Encoder = kotlinEncoderFor(), - outputEncoder: Encoder = kotlinEncoderFor(), - nondeterministic: Boolean = false, -): NamedUserDefinedFunction1 = register( - udaf(name, zero, reduce, merge, finish, bufferEncoder, outputEncoder, nondeterministic) -) diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/UserDefinedFunctionVararg.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/UserDefinedFunctionVararg.kt index 6ffd9ff6..7f0d45e8 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/UserDefinedFunctionVararg.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/UserDefinedFunctionVararg.kt @@ -25,7 +25,6 @@ package org.jetbrains.kotlinx.spark.api import org.apache.spark.sql.* import org.jetbrains.kotlinx.spark.extensions.VarargUnwrapper import org.apache.spark.sql.api.java.* -import org.apache.spark.sql.internal.SQLConf import kotlin.reflect.* import org.apache.spark.sql.expressions.UserDefinedFunction as SparkUserDefinedFunction @@ -83,15 +82,16 @@ class NamedUserDefinedFunctionVararg( @PublishedApi internal inline fun withAllowUntypedScalaUDF(block: () -> R): R { - val sqlConf = SQLConf.get() - val confString = "spark.sql.legacy.allowUntypedScalaUDF" - val prev = sqlConf.getConfString(confString, "false") - sqlConf.setConfString(confString, "true") - return try { - block() - } finally { - sqlConf.setConfString(confString, prev) - } +// val sqlConf = SQLConf.get() +// val confString = "spark.sql.legacy.allowUntypedScalaUDF" +// val prev = sqlConf.getConfString(confString, "false") +// sqlConf.setConfString(confString, "true") +// return try { + return block() +// } +// finally { +// sqlConf.setConfString(confString, prev) +// } } diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/DatePeriodUdt.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/DatePeriodUdt.kt index 3705cb5a..4064223e 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/DatePeriodUdt.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/DatePeriodUdt.kt @@ -1,27 +1,28 @@ package org.jetbrains.kotlinx.spark.api.udts import kotlinx.datetime.DatePeriod -import kotlinx.datetime.toJavaPeriod import kotlinx.datetime.toKotlinDatePeriod -import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.types.UserDefinedType import org.apache.spark.sql.types.YearMonthIntervalType +import java.time.Period /** * NOTE: Just like java.time.DatePeriod, this is truncated to months. */ class DatePeriodUdt : UserDefinedType() { - override fun userClass(): Class = DatePeriod::class.java + override fun deserialize(datum: Any?): DatePeriod? = when (datum) { null -> null - is Int -> IntervalUtils.monthsToPeriod(datum).toKotlinDatePeriod() + is Int -> Period.ofMonths(datum).toKotlinDatePeriod() else -> throw IllegalArgumentException("Unsupported datum: $datum") } override fun serialize(obj: DatePeriod?): Int? = - obj?.let { IntervalUtils.periodToMonths(it.toJavaPeriod()) } + obj?.let { + it.years * 12 + it.months + } override fun sqlType(): YearMonthIntervalType = YearMonthIntervalType.apply() -} \ No newline at end of file +} diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/DurationUdt.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/DurationUdt.kt index ff1e5df4..3e6c40f4 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/DurationUdt.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/DurationUdt.kt @@ -1,46 +1,46 @@ -package org.jetbrains.kotlinx.spark.api.udts - -import org.apache.spark.sql.catalyst.util.IntervalUtils -import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.types.DayTimeIntervalType -import org.apache.spark.sql.types.UserDefinedType -import kotlin.time.Duration -import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.Duration.Companion.nanoseconds -import kotlin.time.toJavaDuration -import kotlin.time.toKotlinDuration - -// TODO Fails, likely because Duration is a value class. -class DurationUdt : UserDefinedType() { - - override fun userClass(): Class = Duration::class.java - override fun deserialize(datum: Any?): Duration? = - when (datum) { - null -> null - is Long -> IntervalUtils.microsToDuration(datum).toKotlinDuration() -// is Long -> IntervalUtils.microsToDuration(datum).toKotlinDuration().let { -// // store in nanos -// it.inWholeNanoseconds shl 1 -// } - else -> throw IllegalArgumentException("Unsupported datum: $datum") - } - -// override fun serialize(obj: Duration): Long = -// IntervalUtils.durationToMicros(obj.toJavaDuration()) - - fun serialize(obj: Long): Long? = - obj?.let { rawValue -> - val unitDiscriminator = rawValue.toInt() and 1 - fun isInNanos() = unitDiscriminator == 0 - val value = rawValue shr 1 - val duration = if (isInNanos()) value.nanoseconds else value.milliseconds - - IntervalUtils.durationToMicros(duration.toJavaDuration()) - } - - override fun serialize(obj: Duration): Long? = - obj?.let { IntervalUtils.durationToMicros(it.toJavaDuration()) } - - - override fun sqlType(): DataType = DayTimeIntervalType.apply() -} \ No newline at end of file +//package org.jetbrains.kotlinx.spark.api.udts +// +//import org.apache.spark.sql.catalyst.util.IntervalUtils +//import org.apache.spark.sql.types.DataType +//import org.apache.spark.sql.types.DayTimeIntervalType +//import org.apache.spark.sql.types.UserDefinedType +//import kotlin.time.Duration +//import kotlin.time.Duration.Companion.milliseconds +//import kotlin.time.Duration.Companion.nanoseconds +//import kotlin.time.toJavaDuration +//import kotlin.time.toKotlinDuration +// +//// TODO Fails, likely because Duration is a value class. +//class DurationUdt : UserDefinedType() { +// +// override fun userClass(): Class = Duration::class.java +// override fun deserialize(datum: Any?): Duration? = +// when (datum) { +// null -> null +// is Long -> IntervalUtils.microsToDuration(datum).toKotlinDuration() +//// is Long -> IntervalUtils.microsToDuration(datum).toKotlinDuration().let { +//// // store in nanos +//// it.inWholeNanoseconds shl 1 +//// } +// else -> throw IllegalArgumentException("Unsupported datum: $datum") +// } +// +//// override fun serialize(obj: Duration): Long = +//// IntervalUtils.durationToMicros(obj.toJavaDuration()) +// +// fun serialize(obj: Long): Long? = +// obj?.let { rawValue -> +// val unitDiscriminator = rawValue.toInt() and 1 +// fun isInNanos() = unitDiscriminator == 0 +// val value = rawValue shr 1 +// val duration = if (isInNanos()) value.nanoseconds else value.milliseconds +// +// IntervalUtils.durationToMicros(duration.toJavaDuration()) +// } +// +// override fun serialize(obj: Duration): Long? = +// obj?.let { IntervalUtils.durationToMicros(it.toJavaDuration()) } +// +// +// override fun sqlType(): DataType = DayTimeIntervalType.apply() +//} \ No newline at end of file diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/InstantUdt.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/InstantUdt.kt index 7b8ba110..c6540a66 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/InstantUdt.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/InstantUdt.kt @@ -1,26 +1,24 @@ package org.jetbrains.kotlinx.spark.api.udts import kotlinx.datetime.Instant -import kotlinx.datetime.toJavaInstant -import kotlinx.datetime.toKotlinInstant -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.`TimestampType$` import org.apache.spark.sql.types.UserDefinedType - +import kotlin.time.Duration.Companion.microseconds +import kotlin.time.Duration.Companion.milliseconds class InstantUdt : UserDefinedType() { - override fun userClass(): Class = Instant::class.java + override fun deserialize(datum: Any?): Instant? = when (datum) { null -> null - is Long -> DateTimeUtils.microsToInstant(datum).toKotlinInstant() + is Long -> Instant.fromEpochMilliseconds(datum.microseconds.inWholeMilliseconds) + else -> throw IllegalArgumentException("Unsupported datum: $datum") } - override fun serialize(obj: Instant?): Long? = - obj?.let { DateTimeUtils.instantToMicros(it.toJavaInstant()) } + override fun serialize(obj: Instant?): Long? = obj?.toEpochMilliseconds()?.milliseconds?.inWholeMicroseconds override fun sqlType(): DataType = `TimestampType$`.`MODULE$` -} \ No newline at end of file +} diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/LocalDateTimeUdt.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/LocalDateTimeUdt.kt index 7dd4fa0d..ec3f4547 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/LocalDateTimeUdt.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/LocalDateTimeUdt.kt @@ -1,12 +1,15 @@ package org.jetbrains.kotlinx.spark.api.udts +import kotlinx.datetime.Instant import kotlinx.datetime.LocalDateTime -import kotlinx.datetime.toJavaLocalDateTime -import kotlinx.datetime.toKotlinLocalDateTime -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import kotlinx.datetime.TimeZone +import kotlinx.datetime.toInstant +import kotlinx.datetime.toLocalDateTime import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.`TimestampNTZType$` import org.apache.spark.sql.types.UserDefinedType +import kotlin.time.Duration.Companion.microseconds +import kotlin.time.Duration.Companion.milliseconds class LocalDateTimeUdt : UserDefinedType() { @@ -15,12 +18,15 @@ class LocalDateTimeUdt : UserDefinedType() { override fun deserialize(datum: Any?): LocalDateTime? = when (datum) { null -> null - is Long -> DateTimeUtils.microsToLocalDateTime(datum).toKotlinLocalDateTime() + is Long -> + Instant.fromEpochMilliseconds(datum.microseconds.inWholeMilliseconds) + .toLocalDateTime(TimeZone.UTC) + else -> throw IllegalArgumentException("Unsupported datum: $datum") } override fun serialize(obj: LocalDateTime?): Long? = - obj?.let { DateTimeUtils.localDateTimeToMicros(it.toJavaLocalDateTime()) } + obj?.toInstant(TimeZone.UTC)?.toEpochMilliseconds()?.milliseconds?.inWholeMicroseconds override fun sqlType(): DataType = `TimestampNTZType$`.`MODULE$` } \ No newline at end of file diff --git a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/LocalDateUdt.kt b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/LocalDateUdt.kt index 033b05e5..5816889a 100644 --- a/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/LocalDateUdt.kt +++ b/kotlin-spark-api/src/main/kotlin/org/jetbrains/kotlinx/spark/api/udts/LocalDateUdt.kt @@ -1,26 +1,21 @@ package org.jetbrains.kotlinx.spark.api.udts import kotlinx.datetime.LocalDate -import kotlinx.datetime.toJavaLocalDate -import kotlinx.datetime.toKotlinLocalDate -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.`DateType$` import org.apache.spark.sql.types.UserDefinedType - class LocalDateUdt : UserDefinedType() { - override fun userClass(): Class = LocalDate::class.java + override fun deserialize(datum: Any?): LocalDate? = when (datum) { null -> null - is Int -> DateTimeUtils.daysToLocalDate(datum).toKotlinLocalDate() + is Int -> LocalDate.fromEpochDays(datum) else -> throw IllegalArgumentException("Unsupported datum: $datum") } - override fun serialize(obj: LocalDate?): Int? = - obj?.let { DateTimeUtils.localDateToDays(it.toJavaLocalDate()) } + override fun serialize(obj: LocalDate?): Int? = obj?.toEpochDays() override fun sqlType(): DataType = `DateType$`.`MODULE$` -} \ No newline at end of file +} diff --git a/spark-connect-examples/build.gradle.kts b/spark-connect-examples/build.gradle.kts index c1f20c0a..1f62c6fa 100644 --- a/spark-connect-examples/build.gradle.kts +++ b/spark-connect-examples/build.gradle.kts @@ -31,15 +31,19 @@ repositories { dependencies { Projects { implementation( - // TODO kotlinSparkApi, + kotlinSparkApi, ) } Dependencies { +// implementation(hadoopClient) + // IMPORTANT! compileOnly(sparkSqlApi) implementation(sparkConnectClient) + + implementation(kotlinDateTime) } } diff --git a/spark-connect-examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt b/spark-connect-examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt index 790bad24..e81f261d 100644 --- a/spark-connect-examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt +++ b/spark-connect-examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt @@ -1,27 +1,48 @@ package org.jetbrains.kotlinx.spark.examples -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connect.client.REPLClassDirMonitor +import org.jetbrains.kotlinx.spark.api.plugin.annotations.Sparkify +import org.jetbrains.kotlinx.spark.api.showDS +import org.jetbrains.kotlinx.spark.api.toList +import org.jetbrains.kotlinx.spark.api.tuples.X +import org.jetbrains.kotlinx.spark.api.tuples.t +import org.jetbrains.kotlinx.spark.api.withSparkConnect +import scala.Tuple2 +import java.time.LocalDate // run with `./gradlew run` or set VM options: "--add-opens=java.base/java.nio=ALL-UNNAMED" in the IDE -fun main() { - val spark = - SparkSession - .builder() - .remote("sc://localhost") - .create() +fun main() = + withSparkConnect("sc://localhost") { + val classFinder = REPLClassDirMonitor("/mnt/data/Projects/kotlin-spark-api/spark-connect-examples/build/classes") + spark.registerClassFinder(classFinder) + spark.addArtifact("/mnt/data/Projects/kotlin-spark-api/spark-connect-examples/build/libs/spark-connect-examples-2.0.0-SNAPSHOT.jar") - val classFinder = REPLClassDirMonitor("/mnt/data/Projects/kotlin-spark-api/spark-connect-examples/build/classes") - spark.registerClassFinder(classFinder) - spark.addArtifact("/mnt/data/Projects/kotlin-spark-api/spark-connect-examples/build/libs/spark-connect-examples-2.0.0-SNAPSHOT.jar") + val data = + listOf( + Person("Alice", 25, LocalDate.of(1996, 1, 1), "Alice" X Address("1 Main St", "Springfield", "IL", 62701)), + Person("Bob", 30, LocalDate.of(1991, 1, 1), "Bob" X Address("2 Main St", "Springfield", "IL", 62701)), + Person("Charlie", 35, LocalDate.of(1986, 1, 1), "Charlie" X Address("3 Main St", "Springfield", "IL", 62701)), + ) - spark.sql("select 1").show() + val ds = data.toDS().showDS() - spark.stop() -} + ds.toList().forEach { + println(it) + } + } -//@Sparkify -//data class Person( -// val name: String, -// val age: Int, -//) +@Sparkify +data class Address( + val street: String, + val city: String, + val state: String, + val zip: Int, +) + +@Sparkify +data class Person( + val name: String, + val age: Int, + val birthDate: LocalDate, + val tuple: Tuple2, +)