-
Notifications
You must be signed in to change notification settings - Fork 0
/
jsonderef.py
214 lines (188 loc) · 6.63 KB
/
jsonderef.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
import copy
import requests
from requests.exceptions import RequestException
class JsonDerefException(Exception):
"""
Generic exception
"""
pass
class RefNotFound(JsonDerefException):
"""
Raised if reference is not found
"""
pass
class JsonDeref(object):
def __init__(self, raise_on_not_found=True, not_found=None,
requests_timeout=10
):
""" Initializes dereferencer.
:param raise_on_not_found: If true, RefNotFound is raised if referenced
object is not found.
:type raise_on_not_found: Boolean
:param not_found: In case of referenced object is not found and
raise_on_not_found is False, this value will be used instead of
referenced object.
:type raise_on_not_found: Anything
:param requests_timeout: Timeout set for requests in case of fetching
remote urls.
:type requests_timeout: Integer
"""
self._cache = {}
self._raise_on_not_found = raise_on_not_found
self._not_found = not_found
self._timeout = requests_timeout
def deref(self, document, max_deref_depth=10):
""" Returns dereferenced object.
Original object is left intact, always new copy of the object is
returned.
:param max_deref_depth: How many times do the recursive dereference.
type: Integer or None
"""
return self._do_deref(document, document, max_deref_depth)
@staticmethod
def _parse_ref_string(ref):
"""
Parses string returning ref object
"""
ref_object = {}
if ref.startswith("#"):
ref_object["type"] = "local"
ref = ref[1:]
elif ref.startswith("http"):
ref_object["type"] = "remote"
hash_index = ref.rfind("#")
ref_object["url"] = ref[:hash_index]
ref = ref[hash_index+1:]
else:
raise JsonDerefException(
"Cannot resolve reference '{0}'".format(ref)
)
ref_object["path"] = []
if ref != "" and not ref.startswith('/'):
raise JsonDerefException(
"Path in the reference must start with '/' ({0}).".format(ref)
)
path = ref.split("/")
# Rfc stuff
for item in path:
itm = item.replace("~1", "/")
itm = itm.replace("~0", "~")
ref_object["path"].append(itm)
return ref_object
def _get_url_json(self, url, store=True):
"""
Returns object stored at url
"""
doc = self._cache.get(url, None)
if doc is not None:
return doc
try:
rsp = requests.get(url, timeout=self._timeout)
if rsp.status_code != 200:
raise RefNotFound(
"Could not get {0}, status code {1}".format(
url, rsp.status_code
)
)
doc = rsp.json()
self._cache[url] = doc
return doc
except ValueError as exc:
raise JsonDerefException(
"Document at {0} is not a valid json. "
"Parser says '{1}'.".format(
url, exc.message
)
)
except RequestException as e:
raise RefNotFound(
"Could not get {0}, error {1}".format(
url, e.message
)
)
def _get_referenced_object(self, cur_root, ref_obj):
"""
Returns referenced object
"""
actual_root = None
if ref_obj["type"] == "local":
actual_root = cur_root
elif ref_obj["type"] == "remote":
try:
actual_root = self._get_url_json(ref_obj["url"])
except RefNotFound:
if self._raise_on_not_found:
raise
else:
return {
"root": cur_root,
"obj": copy.deepcopy(self._not_found)
}
if len(ref_obj["path"]) == 1:
return {
"root": actual_root,
"obj": actual_root
}
else:
cur_obj = actual_root
try:
for p in ref_obj["path"][1:]:
if isinstance(cur_obj, dict):
cur_obj = cur_obj[str(p)]
elif isinstance(cur_obj, list):
cur_obj = cur_obj[int(p)]
return {
"root": actual_root,
"obj": cur_obj
}
except (KeyError, IndexError):
if self._raise_on_not_found:
raise RefNotFound(
"Referenced object in path #{0}"
" has not been found".format(
"/".join(ref_obj["path"])
)
)
else:
return {
"root": cur_root,
"obj": copy.deepcopy(self._not_found)
}
def _do_deref(self, current_root, current_obj, remaining_depth):
"""
Does the recursive job
"""
# No remaining level of dereferencing -just return whatever we have
if remaining_depth == 0:
return copy.deepcopy(current_obj)
# dictionary is either ref or it's keys may contain refs
if isinstance(current_obj, dict):
ref = current_obj.get("$ref", None)
# Ok, this object is reference
if ref is not None:
# Get the object reference is pointing to
result = self._get_referenced_object(
current_root, JsonDeref._parse_ref_string(ref)
)
# And do the deref on it again...:)
return self._do_deref(
result["root"], result["obj"], remaining_depth - 1
)
else:
new_obj = {}
for key in current_obj:
new_obj[key] = self._do_deref(
current_root, current_obj[key], remaining_depth
)
return new_obj
# List may containg refs
elif isinstance(current_obj, list):
new_list = []
for item in current_obj:
new_list.append(
self._do_deref(current_root, item, remaining_depth)
)
return new_list
# Anything else can be just returned
else:
return copy.deepcopy(current_obj)