-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathevaluation.py
116 lines (106 loc) · 5.25 KB
/
evaluation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
''' Evaluation of agent trajectories '''
import json
from collections import defaultdict
import networkx as nx
import numpy as np
import pprint
pp = pprint.PrettyPrinter(indent=4)
from utils import load_datasets, load_nav_graphs, ndtw_graphload, DTW
class Evaluation(object):
''' Results submission format: [{'instr_id': string, 'trajectory':[(viewpoint_id, heading_rads, elevation_rads),] } ] '''
def __init__(self, splits, scans):
self.error_margin = 3.0
self.splits = splits
# self.tok = tok
self.gt = {}
self.instr_ids = []
self.scans = []
for split in splits:
if "72" in split:
for item in load_datasets([split]):
if scans is not None and item['scan'] not in scans:
continue
self.gt[str(item['path_id'])] = item
self.scans.append(item['scan'])
self.instr_ids += [item['instr_id']]
else:
for item in load_datasets([split]):
if scans is not None and item['scan'] not in scans:
continue
self.gt[str(item['path_id'])] = item
self.scans.append(item['scan'])
self.instr_ids += ['%s_%d' % (item['path_id'], i) for i in range(len(item['instructions']))]
self.scans = set(self.scans)
self.instr_ids = set(self.instr_ids)
self.graphs = load_nav_graphs(self.scans)
self.distances = {}
for scan,G in self.graphs.items(): # compute all shortest paths
self.distances[scan] = dict(nx.all_pairs_dijkstra_path_length(G))
def _get_nearest(self, scan, goal_id, path):
near_id = path[0][0]
near_d = self.distances[scan][near_id][goal_id]
for item in path:
d = self.distances[scan][item[0]][goal_id]
if d < near_d:
near_id = item[0]
near_d = d
return near_id
def _score_item(self, instr_id, path):
''' Calculate error based on the final position in trajectory, and also
the closest position (oracle stopping rule).
The path contains [view_id, angle, vofv] '''
gt = self.gt[instr_id.split('_')[-2]]
start = gt['path'][0]
assert start == path[0][0], 'Result trajectories should include the start position'
goal = gt['path'][-1]
final_position = path[-1][0] # the first of [view_id, angle, vofv]
nearest_position = self._get_nearest(gt['scan'], goal, path)
self.scores['nav_errors'].append(self.distances[gt['scan']][final_position][goal])
self.scores['oracle_errors'].append(self.distances[gt['scan']][nearest_position][goal])
self.scores['trajectory_steps'].append(len(path)-1)
distance = 0 # length of the path in meters
prev = path[0]
for curr in path[1:]:
distance += self.distances[gt['scan']][prev[0]][curr[0]]
prev = curr
self.scores['trajectory_lengths'].append(distance)
self.scores['shortest_lengths'].append(
self.distances[gt['scan']][start][goal]
)
self.scores['instr_ids'].append(instr_id)
self.scores['scan'].append(gt['scan'])
def score(self, output_file):
''' Evaluate each agent trajectory based on how close it got to the goal location '''
self.scores = defaultdict(list)
instr_ids = set(self.instr_ids)
if type(output_file) is str:
with open(output_file) as f:
results = json.load(f)
else:
results = output_file
print('result length', len(results))
for item in results:
# Check against expected ids
if item['instr_id'] in instr_ids:
instr_ids.remove(item['instr_id'])
self._score_item(item['instr_id'], item['trajectory'])
if 'train' not in self.splits: # Exclude the training from this. (Because training eval may be partial)
assert len(instr_ids) == 0, 'Missing %d of %d instruction ids from %s - not in %s'\
% (len(instr_ids), len(self.instr_ids), ",".join(self.splits), output_file)
assert len(self.scores['nav_errors']) == len(self.instr_ids)
score_summary = {
'nav_error': np.average(self.scores['nav_errors']),
'oracle_error': np.average(self.scores['oracle_errors']),
'steps': np.average(self.scores['trajectory_steps']),
'lengths': np.average(self.scores['trajectory_lengths'])
}
num_successes = len([i for i in self.scores['nav_errors'] if i < self.error_margin])
score_summary['success_rate'] = float(num_successes)/float(len(self.scores['nav_errors']))
oracle_successes = len([i for i in self.scores['oracle_errors'] if i < self.error_margin])
score_summary['oracle_rate'] = float(oracle_successes)/float(len(self.scores['oracle_errors']))
spl = [float(error < self.error_margin) * l / max(l, p, 0.01)
for error, p, l in
zip(self.scores['nav_errors'], self.scores['trajectory_lengths'], self.scores['shortest_lengths'])
]
score_summary['spl'] = np.average(spl)
return score_summary, self.scores