-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtransform.py
64 lines (55 loc) · 1.93 KB
/
transform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
df = spark.read.json("sample.json")
def _eparams(rec):
degrees, schools, fields_of_study = [], [], []
for row in rec:
if row["degree_name"] and row["school"] and row["field_of_study"]:
degrees.append(row["degree_name"])
schools.append(row["school"])
fields_of_study.append(row["field_of_study"])
degrees = [x.strip() for x in degrees]
schools = [x.strip() for x in schools]
fields_of_study = [x.strip() for x in fields_of_study]
numDegrees = len(degrees)
if numDegrees > 0:
recent_degree = degrees[0]
recent_field_of_study = fields_of_study[0]
recent_school = schools[0]
return (
str(numDegrees),
recent_degree,
recent_field_of_study,
recent_school,
"|".join(degrees[1:]),
"|".join(fields_of_study[1:]),
"|".join(schools[1:]),
)
else:
return (str(0), None, None, None, None, None, None)
udf_edu = udf(lambda x: _eparams(list(x)), ArrayType(StringType()))
df = (
df.withColumn("languages", concat_ws(" ", col("languages")))
.withColumn("edu_params", udf_edu(col("education")))
.select(
"public_identifier",
"full_name",
"headline",
"summary",
"country",
"country_full_name",
"city",
"state",
"languages",
col("edu_params")[0].alias("numDegrees"),
col("edu_params")[1].alias("recent_degree"),
col("edu_params")[2].alias("recent_field_of_study"),
col("edu_params")[3].alias("recent_school"),
col("edu_params")[4].alias("previous_degrees"),
col("edu_params")[5].alias("previous_fields_of_study"),
col("edu_params")[6].alias("previous_schools"),
)
)
df.show(3, truncate=False)