forked from rosin-project/rosinstall_generator_time_machine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfind_closest_cache.py
executable file
·133 lines (109 loc) · 4.4 KB
/
find_closest_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2018, G.A. vd. Hoorn
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# searches for the sub dir in BASE_DIR that:
#
# a) has a rosdistro cache file for DISTRO, and
# b) is closest in terms of time based on 'dir name -> epoch' conversion
#
# If there is a direct match (so a cache for DISTRO in a dir for the
# exact STAMP) that is returned.
#
# In absence of a direct match, all dirs with a cache for DISTRO are
# checked for distance to STAMP. The closest one will be returned.
#
# If there are no dirs with caches for DISTRO, then an empty
# string is returned and the exit status is 1.
#
# Exit status is 0 in all other cases.
# this script expects a directory structure with compressed rosdistro
# cache files sorted according to timestamp.
#
# Example:
#
# BASE_DIR
# └── 123456789
# └── hydro-cache.yaml.gz
#
# each directory (with a 'unix epoch stamp' as name) can contain
# cache files for each of the ROS repositories.
import datetime
import dateutil.parser
from os import path, listdir
import sys
import argparse
from bisect import bisect_left
verbose = False
def log(s):
if verbose:
sys.stderr.write(s + '\n')
def find_closest_entry(lst, n):
idx = bisect_left(lst, n)
if idx == 0:
return lst[0]
if idx == len(lst):
return lst[-1]
before = lst[idx - 1]
after = lst[idx]
if (after - n) < (n - before):
return after
else:
return before
def has_cache_file(d, distro):
return path.isfile(path.join(d, distro + '-cache.yaml.gz'))
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--absolute', action='store_true',
dest='absolute', help='Return absolute paths (default: return relative paths)')
parser.add_argument('-v', '--verbose', action='store_true', dest='verbose')
parser.add_argument('BASE_DIR', help='The base cache directory')
parser.add_argument('DISTRO', help='The ROS distro to find the closest cache dir for')
parser.add_argument('STAMP', help='The Unix epoch or ISO 8601 datetime '
'to find the closest cache dir for')
args = parser.parse_args()
verbose = args.verbose
base_dir = args.BASE_DIR
stamp = args.STAMP
ros_distro = args.DISTRO.lower()
if 't' in stamp.lower() or ':' in stamp:
# assume it's an ISO 8601 datetime, so convert
stamp = dateutil.parser.parse(stamp).strftime('%s')
iso_date = datetime.datetime.utcfromtimestamp(float(stamp)).isoformat() + 'Z'
log("INFO: searching for {} cache closest to: {} ({})".format(ros_distro, stamp, iso_date))
log("INFO: using rosdistro cache dir: {}".format(base_dir))
# if we have an exact match (ie: cache exists for stamp), just return that
candidate = path.join(base_dir, stamp)
if has_cache_file(candidate, ros_distro):
log("INFO: found exact match, returning: {}".format(candidate))
closest_cache_path = candidate
else:
# no exact match: filter all candidates based on whether they have
# a cache file for the distribution we're interested in
# find_cache_dirs_for_distro(base_dir, ros_distro)
candidates = [d for d in listdir(base_dir) if has_cache_file(path.join(base_dir, d), ros_distro)]
epochs = [int(d) for d in candidates if d.isdigit()]
epochs.sort()
if not epochs:
log("ERR : no candidates left after filtering")
closest_cache_path = ''
else:
entry = find_closest_entry(epochs, int(stamp))
closest_cache_path = path.join(base_dir, str(entry))
log("INFO: closest cache dir: {}".format(closest_cache_path))
if args.absolute and closest_cache_path != '':
closest_cache_path = path.abspath(closest_cache_path)
log("INFO: returning: {}".format(closest_cache_path))
print (closest_cache_path)
sys.exit(0 if closest_cache_path != '' else 1)