-
Notifications
You must be signed in to change notification settings - Fork 1
/
postalCodes.scala
121 lines (95 loc) · 4.25 KB
/
postalCodes.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package nl.utwente.bigdata; // don't change package name
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.rdd.RDD
import magellan.{Point, Polygon, PolyLine}
import org.apache.spark.sql.magellan.dsl.expressions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import scala.collection.immutable.ListMap
// uncomment if your program uses sql
import org.apache.spark.sql.{ SQLContext }
object postalCodes {
case class TwitterRecord(userId: String, point: Point)
case class PostalCodesList(userId: String, postalCodes: Iterable[String])
case class PostalCodesMap(userId: String, postalCodes: Map[String, Int])
case class PostalCodesListMap(userId: String, postalCodes: ListMap[String, Int])
case class PostalCode(userId: String, postalCode: String)
/*
add actual program here, start by specifying
the input and output types in RDD[X]
*/
def doJob(twitter: org.apache.spark.sql.DataFrame, sqlContext: SQLContext) : RDD[org.apache.spark.sql.Row] = {
import sqlContext.implicits._
val tweets = twitter.filter(twitter("coordinates").isNotNull).select(twitter("user.id"),twitter("coordinates.type"), twitter("coordinates.coordinates")(0).as("latitude"), twitter("coordinates.coordinates")(1).as("longitude")).map{ line =>
val userId = line(0).toString
val latitude = line(2).toString
val longitude = line(3).toString
val point = Point(latitude.toDouble, longitude.toDouble)
TwitterRecord(userId, point)
}.repartition(100).toDF().
cache()
/* Read in Postcode data */
val loadedPostcodes = sqlContext.read.format("magellan").load("WSG/")
val postcodes = loadedPostcodes.select(loadedPostcodes("polygon"), org.apache.spark.sql.functions.explode(loadedPostcodes("metadata")).as(Seq("k", "v"))).cache()
val filteredPostcodes = postcodes.where(postcodes("k") === "PC4").
withColumnRenamed("v", "postalcode").
drop("k").
cache()
/* Combine the data */
val joined = filteredPostcodes.
join(tweets).
where(tweets("point") within filteredPostcodes("polygon")).
drop("polygon").drop("point").
cache()
/* Determine postalcode for each user (userId, postalCode)
TODO Filer postalcodes */
//.map(x => (x._0, List(x._1))) .map(lambda x : getBestCoordinate(x))
val user_with_postalcode = joined.rdd.map(x => (x(1), x(0))).groupByKey()
.map(x => Row(x._2.groupBy(identity).mapValues(_.size).maxBy(_._2)._1, x._1)).cache()
/*
.map(x => PostalCodesList(x(0).asInstanceOf[String], x(1).asInstanceOf[Iterable[String]]))
.map(x => PostalCodesMap(x.userId, x.postalCodes.groupBy(identity).mapValues(_.size)))
.map(x => PostalCodesListMap(x.userId, ListMap(x.postalCodes.toSeq.sortWith(_._2 > _._2):_*)))
.map(x => Row(x.userId, x.postalCodes.head._1))
map{pair =>
val userId = pair(0)
val list = pair(1).asInstanceOf[Iterable[String]]
val map = list.groupBy(identity).mapValues(_.size)
val listMap = ListMap(map.toSeq.sortWith(_._2 > _._2):_*)
Row(userId, listMap.head._1)
}.cache()*/
/*
val pair = Row(x)
val map = list.groupBy(identity).mapValues(_.size)
val listMap = ListMap(map.toSeq.sortWith(_._2 > _._2):_*)
Row(userId, listMap.head._1)*/
//return user_with_postalcode.repartition(1)
//.groupBy($"userId").map{ x => return getPostalCode(x)}
/*groupBy($"postalcode").
agg(countDistinct("userId").
as("#users")).
orderBy(col("#users").desc)*/
/* Count users per postalcode */
val users_per_postalcode = user_with_postalcode.map(x => (x(0), 1)).reduceByKey((x, y) => x + y).map(x => Row(x))
.cache()
return users_per_postalcode.repartition(1)
}
def main(args: Array[String]) {
// command line arguments
val appName = this.getClass.getName
// interpret command line, default: first argument is input second is output
val inputDir = args(0)
val outputDir = args(1)
// configuration
val conf = new SparkConf()
.setAppName(s"$appName $inputDir $outputDir")
// create spark context
val sc = new SparkContext(conf)
// uncomment if your program uses sql
val sqlContext = new SQLContext(sc)
// potentially
doJob(sqlContext.read.json(inputDir), sqlContext).saveAsTextFile(outputDir)
}
}