-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathmeasure-dns-nsid.py
140 lines (128 loc) · 4.34 KB
/
measure-dns-nsid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python
""" Python code to start a RIPE Atlas UDM (User-Defined
Measurement). This one is for running DNS NSID queries (find the
identity of a name server).
By default, it runs measurements in six geographical areas. You can
use -l to run it on specific probes instead.
It outputs the ID of the measurements.
Stephane Bortzmeyer <bortzmeyer@nic.fr>
"""
import urllib2
import urllib
import json
import time
import sys
import time
import getopt
import os
import string
# Default values
family = 4
selection = "area"
query = "fr"
qtype = "SOA"
num_probes = 500
one_area = False
# Parameters
descr = "Check identity of %s anycast instance"
data = { "definitions": [
{"packets": 1, "use_NSID": True,
"query_argument": query, "query_class": "IN", "query_type": qtype,
"type": "dns", "is_oneoff": True} ],
"probes": []}
authfile = "%s/.atlas/auth" % os.environ['HOME']
class CredentialsNotFound(Exception):
pass
class JsonRequest(urllib2.Request):
def __init__(self, url):
urllib2.Request.__init__(self, url)
self.add_header("Content-Type", "application/json")
self.add_header("Accept", "application/json")
def usage(msg=None):
print >>sys.stderr, "Usage: %s [-4] [-6] [-q name] [-n N] [-l N,N,N...] target" % sys.argv[0]
if msg is not None:
print >>sys.stderr, msg
try:
optlist, args = getopt.getopt (sys.argv[1:], "46q:t:n:l:ho",
["list_probes=", "type=", "number=",
"one_area",
"query=", "help"])
for option, value in optlist:
if option == "--help" or option == "-h":
usage()
sys.exit(0)
elif option == "--list_probes" or option == "-l":
probes = string.split(value, ',')
selection = "list"
elif option == "--query" or option == "-q":
query = value
elif option == "--one_area" or option == "-o":
one_area = True
elif option == "--type" or option == "-t":
qtype = value
elif option == "--number" or option == "-n":
num_probes = int(value)
elif option == "-4":
family = "4"
elif option == "-6":
family = "6"
else:
# Should never occur, it is trapped by getopt
print >>sys.stderr, "Unknown option %s" % option
usage()
sys.exit(1)
except getopt.error, reason:
usage(reason)
sys.exit(1)
if len(args) != 1:
usage()
sys.exit(1)
target = args[0]
data["definitions"][0]["af"] = family
data["definitions"][0]["target"] = target
data["definitions"][0]["query"] = query
data["definitions"][0]["query_type"] = qtype
data["definitions"][0]["description"] = descr % target
if selection == "area":
data["probes"].append({ "requested": num_probes, "type": "area" })
elif selection == "list":
data["probes"].append({ "requested": len(probes), "type": "probes", "value": string.join(probes,',') })
else:
print >>sys.stderr, "Internal error, invalide selection \"%s\"" % selection
sys.exit(1)
if not os.path.exists(authfile):
raise CredentialsNotFound(authfile)
auth = open(authfile)
key = auth.readline()[:-1]
auth.close()
url = "https://atlas.ripe.net/api/v1/measurement/?key=%s" % key
if selection == "area":
if one_area:
list_areas = ["WW", ]
else:
list_areas = ["WW", "West", "North-East", "South-East", "North-Central", "South-Central"]
for area in list_areas:
data["probes"][0]["value"] = area
json_data = json.dumps(data)
try:
request = JsonRequest(url)
conn = urllib2.urlopen(request, json_data)
results = json.load(conn) # TODO: error handling
print "%s: measurement #%s" % (area, results["measurements"])
conn.close()
except urllib2.HTTPError as e:
print >>sys.stderr, ("Fatal error: %s" % e.read())
raise
conn.close()
elif selection == "list":
json_data = json.dumps(data)
try:
request = JsonRequest(url)
conn = urllib2.urlopen(request, json_data)
results = json.load(conn)
print "%s: measurement #%s" % (probes, results["measurements"])
conn.close()
except urllib2.HTTPError as e:
print >>sys.stderr, ("Fatal error: %s" % e.read())
raise
conn.close()