-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkmeans_gaze_correction.py
196 lines (165 loc) · 7.73 KB
/
kmeans_gaze_correction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# -*- coding: utf-8 -*-
'''
Pupil Player Third Party Plugins by cpicanco
Copyright (C) 2016 Rafael Picanço.
The present file is distributed under the terms of the GNU General Public License (GPL v3.0).
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
# modified from manual_gaze_correction
import cv2
from copy import deepcopy
from plugin import Plugin
import numpy as np
from methods import normalize
from pyglui import ui
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# todo: test https://en.wikipedia.org/wiki/DBSCAN, or alternative clustering algorithms.
class KMeans_Gaze_Correction(Plugin):
"""
Correct gaze by:
-1) assuming equaly distributed k stimuli at a detected screen (in the world frame);
0) screen center and gaze points are given from homographic trasformation
1) grouping gaze in blocks with arbitrary size;
2) infering bias from the difference between the mean k-means clustering and the screen center
3) correction is the screen center subtracted from bias
"""
def __init__(self,g_pool):
super(KMeans_Gaze_Correction, self).__init__(g_pool)
self.g_pool = g_pool
self.order = .3
self.menu = None
self.untouched_gaze_positions_by_frame = None
self.bias_by_frame = None
self.screen_detector = None
for p in self.g_pool.plugins:
if p.class_name == 'Offline_Screen_Detector':
if p.alive:
self.screen_detector = p
break
if not self.screen_detector:
self.alive = False
logger.error('Open the Offline Screen Detector.')
else:
self.gaze_correction()
def _load_untouched_gaze(self):
if self.untouched_gaze_positions_by_frame:
self.g_pool.gaze_positions_by_frame = self.untouched_gaze_positions_by_frame
self.notify_all({'subject':'gaze_positions_changed'})
def _get_bias_by_frame(self):
def locate_m_to_screen(surface,frame_idx):
m_to_screen = None
cache = surface.cache[frame_idx]
if cache == False: #cached data not avaible for this frame
return False
elif cache == None: #cached data shows surface not found:
return False
else:
m_to_screen = cache['m_to_screen']
return m_to_screen
def surface_to_screen(pos, m_to_screen):
if m_to_screen is not None:
#convenience lines to allow 'simple' vectors (x,y) to be used
shape = pos.shape
pos.shape = (-1,1,2)
new_pos = cv2.perspectiveTransform(pos,m_to_screen)
new_pos.shape = shape
return new_pos
else:
return None
# load dependencies
bias_along_blocks = None
if self.screen_detector.surfaces:
s = self.screen_detector.surfaces[0]
if s.output_data:
try:
bias_along_blocks = self.screen_detector.surfaces[0].output_data['bias_along_blocks']
except Exception, e:
logger.error('Bias data not found. Error:%s'%e) # this should never happen
else:
logger.error('Correction data not found (Did you press recalculate?).')
else:
logger.error('Screen not found. Define a screen/surface first.')
if not bias_along_blocks:
self.alive = False
return False
# create an empty array with the same structure as gaze positions by frame
bias_by_frame = [[] for frame in self.g_pool.gaze_positions_by_frame]
for f in range(0,len(bias_by_frame)):
bias_by_frame[f] = [[] for gaze in self.g_pool.gaze_positions_by_frame[f]]
unbiased_gaze = s.output_data['unbiased_gaze']
indexed_bias = [[] for gaze in unbiased_gaze]
# fill with data
for b in bias_along_blocks:
for i,_ in enumerate(indexed_bias[b['block'][0]:b['block'][1]]):
indexed_bias[i] = b['bias']
for index, d in enumerate(unbiased_gaze):
bias = indexed_bias[index]
f = d['frame']
i = d['i']
bias_by_frame[f][i] = bias
# we must have at least the first gaze to continue no matter what happend before this line
if len(bias_by_frame[0][0]) < 2:
bias_by_frame[0][0] = bias_along_blocks[0]['bias']
# normalize and reverse homographic transformation
x_size, y_size = s.real_world_size['x'], s.real_world_size['y']
for f in range(len(self.g_pool.gaze_positions_by_frame)):
for i in range(len(self.g_pool.gaze_positions_by_frame[f])):
if len(bias_by_frame[f][i]) == 2:
bias_by_frame[f][i] = normalize(bias_by_frame[f][i], (x_size,y_size), True)
#bias_by_frame[f][i] = s.ref_surface_to_img(np.array(bias_by_frame[f][i]))
#m_to_screen = locate_m_to_screen(s, f)
#bias_by_frame[f][i] = surface_to_screen(np.array(bias_by_frame[f][i]), m_to_screen)
#bias_by_frame[f][i] = bias_by_frame[f][i]
last_bias = bias_by_frame[f][i]
else:
# finally, we apply the bias correction to all gaze, not only to the filtered ones
bias_by_frame[f][i] = last_bias
self.bias_by_frame = bias_by_frame
return True
def _set_gaze_correction(self):
def apply_correction(f, i):
x_bias, y_bias = self.bias_by_frame[f][i]
gaze_pos = self.untouched_gaze_positions_by_frame[f][i]['norm_pos']
gaze_pos = gaze_pos[0]+(x_bias), gaze_pos[1]+(y_bias)
self.g_pool.gaze_positions_by_frame[f][i]['norm_pos'] = gaze_pos
not_corrected = 0
for f in range(len(self.g_pool.gaze_positions_by_frame)):
for i in range(len(self.g_pool.gaze_positions_by_frame[f])):
if len(self.bias_by_frame[f][i]) == 2:
apply_correction(f, i)
else:
not_corrected += 1
if not_corrected > 0:
logger.info('{0} gaze points were not corrected. The higher the worse.'.format(not_corrected))
self.notify_all_delayed({'subject':'gaze_positions_changed'})
def gaze_correction(self):
self.untouched_gaze_positions_by_frame = deepcopy(self.g_pool.gaze_positions_by_frame)
if self._get_bias_by_frame():
self._set_gaze_correction()
def init_gui(self):
# initialize the menu
self.menu = ui.Scrolling_Menu('Gaze Correction (k-means)')
self.g_pool.gui.append(self.menu)
self.menu.append(ui.Button('Close',self.unset_alive))
self.menu.append(ui.Info_Text('This plugin applies to all gaze the bias found by the offline screen detector plugin.'))
def deinit_gui(self):
if self.menu:
self.g_pool.gui.remove(self.menu)
self.menu = None
def load_untouched_gaze_no_notify(self):
if self.untouched_gaze_positions_by_frame:
self.g_pool.gaze_positions_by_frame = self.untouched_gaze_positions_by_frame
def unset_alive(self):
self.alive = False
#def get_init_dict(self):
# return {'x_offset':self.x_offset,'y_offset':self.y_offset}
def cleanup(self):
""" called when the plugin gets terminated.
This happens either voluntarily or forced.
if you have a GUI or glfw window destroy it here.
"""
self._load_untouched_gaze()
self.deinit_gui()