-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdecisionTree.py
132 lines (120 loc) · 5.72 KB
/
decisionTree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import numpy
import pandas
import random
def trainTestSplit(dataFrame, testSize):
if isinstance(testSize, float):
testSize = round(testSize * len(dataFrame))
indices = dataFrame.index.tolist()
testIndices = random.sample(population = indices, k = testSize)
dataFrameTest = dataFrame.loc[testIndices]
dataFrameTrain = dataFrame.drop(testIndices)
return dataFrameTrain, dataFrameTest
def checkPurity(data):
if len(numpy.unique(data[:, -1])) == 1:
return True
else:
return False
def classifyData(data):
uniqueClasses, uniqueClassesCounts = numpy.unique(data[:, -1], return_counts = True)
return uniqueClasses[uniqueClassesCounts.argmax()]
def getPotentialSplits(data, randomAttributes):
potentialSplits = {}
_, columns = data.shape
columnsIndices = list(range(columns - 1))
if randomAttributes != None and len(randomAttributes) <= len(columnsIndices):
columnsIndices = randomAttributes
for column in columnsIndices:
values = data[:, column]
uniqueValues = numpy.unique(values)
if len(uniqueValues) == 1:
potentialSplits[column] = uniqueValues
else:
potentialSplits[column] = []
for i in range(len(uniqueValues)):
if i != 0:
currentValue = uniqueValues[i]
previousValue = uniqueValues[i - 1]
potentialSplits[column].append((currentValue + previousValue) / 2)
return potentialSplits
def splitData(data, splitColumn, splitValue):
splitColumnValues = data[:, splitColumn]
return data[splitColumnValues <= splitValue], data[splitColumnValues > splitValue]
def calculateEntropy(data):
_, uniqueClassesCounts = numpy.unique(data[:, -1], return_counts = True)
probabilities = uniqueClassesCounts / uniqueClassesCounts.sum()
return sum(probabilities * -numpy.log2(probabilities))
def calculateOverallEntropy(dataBelow, dataAbove):
pDataBelow = len(dataBelow) / (len(dataBelow) + len(dataAbove))
pDataAbove = len(dataAbove) / (len(dataBelow) + len(dataAbove))
return pDataBelow * calculateEntropy(dataBelow) + pDataAbove * calculateEntropy(dataAbove)
def determineBestSplit(data, potentialSplits, randomSplits = None):
overallEntropy = 9999
bestSplitColumn = 0
bestSplitValue = 0
if randomSplits == None:
for splitColumn in potentialSplits:
for splitValue in potentialSplits[splitColumn]:
dataBelow, dataAbove = splitData(data, splitColumn, splitValue)
currentOverallEntropy = calculateOverallEntropy(dataBelow, dataAbove)
if currentOverallEntropy <= overallEntropy:
overallEntropy = currentOverallEntropy
bestSplitColumn = splitColumn
bestSplitValue = splitValue
else:
for i in range(randomSplits):
randomSplitColumn = random.choice(list(potentialSplits))
randomSplitValue = random.choice(potentialSplits[randomSplitColumn])
dataBelow, dataAbove = splitData(data, randomSplitColumn, randomSplitValue)
currentOverallEntropy = calculateOverallEntropy(dataBelow, dataAbove)
if currentOverallEntropy <= overallEntropy:
overallEntropy = currentOverallEntropy
bestSplitColumn = randomSplitColumn
bestSplitValue = randomSplitValue
return bestSplitColumn, bestSplitValue
def buildDecisionTree(dataFrame, currentDepth = 0, minSampleSize = 2, maxDepth = 1000, randomAttributes = None, randomSplits = None):
if currentDepth == 0:
global COLUMN_HEADERS
COLUMN_HEADERS = dataFrame.columns
data = dataFrame.values
if randomAttributes != None and randomAttributes <= len(COLUMN_HEADERS) - 1:
randomAttributes = random.sample(population = list(range(len(COLUMN_HEADERS) - 1)), k = randomAttributes)
else:
randomAttributes = None
else:
data = dataFrame
if checkPurity(data) or len(data) < minSampleSize or currentDepth == maxDepth:
return classifyData(data)
else:
currentDepth += 1
potentialSplits = getPotentialSplits(data, randomAttributes)
splitColumn, splitValue = determineBestSplit(data, potentialSplits, randomSplits)
dataBelow, dataAbove = splitData(data, splitColumn, splitValue)
if len(dataBelow) == 0 or len(dataAbove) == 0:
return classifyData(data)
else:
question = str(COLUMN_HEADERS[splitColumn]) + " <= " + str(splitValue)
decisionSubTree = {question: []}
yesAnswer = buildDecisionTree(dataBelow, currentDepth, minSampleSize, maxDepth, randomAttributes, randomSplits)
noAnswer = buildDecisionTree(dataAbove, currentDepth, minSampleSize, maxDepth, randomAttributes, randomSplits)
if yesAnswer == noAnswer:
decisionSubTree = yesAnswer
else:
decisionSubTree[question].append(yesAnswer)
decisionSubTree[question].append(noAnswer)
return decisionSubTree
def classifySample(sample, decisionTree):
if not isinstance(decisionTree, dict):
return decisionTree
question = list(decisionTree.keys())[0]
attribute, value = question.split(" <= ")
if sample[attribute] <= float(value):
answer = decisionTree[question][0]
else:
answer = decisionTree[question][1]
return classifySample(sample, answer)
def decisionTreePredictions(dataFrame, decisionTree):
predictions = dataFrame.apply(classifySample, axis = 1, args = (decisionTree,))
return predictions
def calculateAccuracy(predictedResults, category):
resultCorrect = predictedResults == category
return resultCorrect.mean()