-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPreprocess.py
32 lines (28 loc) · 1.64 KB
/
Preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import sys
from pyspark import SparkContext, SparkConf, sql
from pyspark.ml.classification import LogisticRegression
from functools import reduce
if __name__ == "__main__":
# create Spark context with Spark configuration
conf = SparkConf().setAppName("Practica 4 - O.J.F.")
sc = SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)
# Obtenemos una lista con las cabeceras para poder componer
# el DF posteriormente y seleccionar las que nos interesen.
headerFile = sc.textFile("/user/datasets/ecbdl14/ECBDL14_IR2.header").collect()
headerFiltered = filter(lambda line: "@attribute" in line ,headerFile)
headers = list(map(lambda line: line.split()[1], headerFiltered))
# Leemos el DF con los datos y renombramos las columnas.
df = sqlContext.read.csv("/user/datasets/ecbdl14/ECBDL14_IR2.data",header=False,sep=",",inferSchema=True)
dfRenamed = reduce(lambda data, idx: data.withColumnRenamed(df.schema.names[idx], headers[idx]), range(len(df.schema.names)), df)
# Obtenemos el nuevo DF con las columnas asignadas
dfRenamed.createOrReplaceTempView("sql_dataset")
sqlDF = sqlContext.sql('SELECT PredSA_freq_global_0, `PredSA_central_-2`, PSSM_r1_3_V, PSSM_r1_2_I, PSSM_r1_2_W, `PSSM_r2_-4_Y`, class FROM sql_dataset')
#Guardamos el DF filtrado para poder cargarlo en futuras ejecuciones a la hora
#de probar los diferentes modelos de entrenamiento
sqlDF.write.format('csv').option('header',True).save('./filteredC.small.training')
#sqlDF.show()
#lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
#lrModel = lr.fit(sqlDF)
#lrModel.summary()
sc.stop()