-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathSReadPartitionAware_Mismatch.scala
103 lines (85 loc) · 3.15 KB
/
SReadPartitionAware_Mismatch.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package examples
import java.io.IOException
import edb.client.DBClient
import edb.common.{ExistingTableException, Row, Schema, UnknownTableException}
import edb.server.DBServer
import examples.utils.RDDUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, sum}
import scala.collection.JavaConverters._
object SReadPartitionAware_Mismatch {
@throws[IOException]
@throws[InterruptedException]
@throws[ExistingTableException]
@throws[UnknownTableException]
def main(args: Array[String]): Unit = {
val serverHost = "localhost"
val serverPort = 50199
val server = new DBServer(serverPort)
server.start()
System.out.println("*** Example database server started")
//
// Since this DataSource doesn't support writing, we need to populate
// ExampleDB with some data.
//
val schema = new Schema
schema.addColumn("g", Schema.ColumnType.STRING)
schema.addColumn("u", Schema.ColumnType.INT64)
val client = new DBClient(serverHost, serverPort)
// This time the table is not clustered on any column
client.createTable("myTable", schema)
val toInsert = for {
i <- (0 to 19).toList
} yield {
val r = new Row
r.addField(new Row.StringField("g", "G_" + (i % 4)))
r.addField(new Row.Int64Field("u", i * 100))
r
}
client.bulkInsert("myTable", toInsert.asJava)
System.out.println("*** Example database server populated with data")
//
// By default this data source supports creating Datasets with four partitions.
//
val dataSourceName = "datasources.PartitioningRowDataSource"
val spark = SparkSession.builder
.appName("SReadPartitionAware-Mismatch")
.master("local[4]")
.getOrCreate
//
// This is where we read from our DataSource. Notice how we use the
// fully qualified class name and provide the information needed to connect to
// ExampleDB using options. We specify two partitions so that each can be expected
// to contain two clusters. But the table wasn't set up with the column clustered, so
// a shuffle will be needed.
//
val data = spark.read
.format(dataSourceName)
.option("host", serverHost)
.option("port", serverPort)
.option("table", "myTable")
.option("partitions", 2)
.load // number of partitions specified here
System.out.println("*** Schema: ")
data.printSchema()
System.out.println("*** Data: ")
data.show()
RDDUtils.analyze(data)
//
// The following aggregation query needs each group to end up on the same partition so it can
// aggregate in parallel, and the optimizer will insert a potentially expensive shuffle
// in order to achieve this unless the data source tells it that necessary partitioning has
// already been achieved
//
val aggregated = data.groupBy(col("g")).agg(sum(col("u")))
//
// Note: since a shuffle was required, the resulting table has the usual default
// number of partitions -- 200 as of Spark 2.3.0
//
System.out.println("*** Query result: ")
aggregated.show()
RDDUtils.analyze(aggregated)
spark.stop()
server.stop()
}
}