-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcompactdenial.py
executable file
·157 lines (124 loc) · 4.7 KB
/
compactdenial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
#!/usr/bin/env python3
"""
compactdenial.py
Library of functions to work with Compact Denial of Existence.
Author: Shumon Huque
"""
# When releasing new version, update git tag too.
__version__ = "0.0.5"
import dns.resolver
import dns.query
import dns.name
import dns.rdatatype
import dns.rcode
# NXNAME meta type code point
NXNAME_RRTYPE = 128
# Compact Answer OK (CO) EDNS Header Flag
EDNS_FLAG_CO = 0x4000
# Default resolver list
RESOLVER_LIST = ['8.8.8.8', '1.1.1.1']
# Other parameters
DEFAULT_UDP_PAYLOAD = 1420
DEFAULT_QUERY_TIMEOUT = 5
def get_resolver(addresses=None, lifetime=5, payload=1420, coflag=False):
"""
Return resolver object configured to use given list of addresses, and
that sets DO=1, RD=1, AD=1, and EDNS payload for queries to the resolver.
"""
ednsflags = dns.flags.DO
if coflag:
ednsflags |= EDNS_FLAG_CO
resolver = dns.resolver.Resolver()
resolver.set_flags(dns.flags.RD | dns.flags.AD)
resolver.use_edns(edns=0, ednsflags=ednsflags, payload=payload)
resolver.lifetime = lifetime
if addresses is not None:
resolver.nameservers = addresses
return resolver
def is_authenticated(msg):
"""Does DNS message have Authenticated Data (AD) flag set?"""
return msg.flags & dns.flags.AD == dns.flags.AD
def nsec_type_set(type_bitmaps):
"""
Return set of RR types present in given NSEC record's type bitmaps.
"""
type_set = set()
for (window, _, bitnumbers) in nsec_windows(type_bitmaps):
for bitnumber in bitnumbers:
rrtype = window * 256 + bitnumber
type_set.add(rrtype)
return type_set
def nsec_windows(type_bitmaps):
"""
Iterator that returns info about the next NSEC windowed bitmap:
Window#, bitmap field, and bit numbers that are set.
"""
for (window, bitmap) in type_bitmaps:
bitnumbers = []
for i, _ in enumerate(bitmap):
for j in range(0, 8):
if bitmap[i] & (0x80 >> j):
bitnumbers.append(i * 8 + j)
yield window, bitmap, bitnumbers
def rcode(msg, qname):
"""
Return rcode for given DNS response message. If a compact denial
style NOERROR response is detected, return NXDOMAIN. Otherwise
return the actual rcode observed in the DNS reply message.
A compact denial style NOERROR response is a NXDOMAIN response
disguised as a NOERROR/NODATA. It is identified by a NOERROR
response with an empty answer section, and an authority section
containing an NSEC record matching the query name that contains
in its type bitmaps field: NSEC, RRSIG, and the NXNAME sentinel type.
It is sufficent to only check for the presence of NXNAME.
https://datatracker.ietf.org/doc/draft-ietf-dnsop-compact-denial-of-existence/
"""
if not isinstance(qname, dns.name.Name):
qname = dns.name.from_text(qname)
if (msg.rcode() == dns.rcode.NOERROR and not msg.answer):
for rrset in msg.authority:
if rrset.name != qname:
continue
if rrset.rdtype != dns.rdatatype.NSEC:
continue
for rdata in rrset.to_rdataset():
if NXNAME_RRTYPE in nsec_type_set(rdata.windows):
return dns.rcode.NXDOMAIN
return msg.rcode()
return msg.rcode()
def query_resolver(qname, qtype, resolver=None):
"""
Queries a DNS resolver for a given DNS qname and qtype and returns
the response message.
"""
if resolver is None:
resolver = get_resolver()
if not isinstance(qname, dns.name.Name):
qname = dns.name.from_text(qname)
try:
msg = resolver.resolve(qname, qtype, raise_on_no_answer=False).response
except dns.resolver.NXDOMAIN as error:
return error.response(qname)
return msg
def query_server(qname, qtype, server, coflag=False):
"""
Queries a DNS server directly for a given DNS qname and qtype and returns
the response message. Uses UDP transport with fallback to TCP upon
truncation.
"""
if not isinstance(qname, dns.name.Name):
qname = dns.name.from_text(qname)
ednsflags = dns.flags.DO
if coflag:
ednsflags |= EDNS_FLAG_CO
query = dns.message.make_query(qname,
qtype,
use_edns=True,
ednsflags=ednsflags,
want_dnssec=True,
payload=DEFAULT_UDP_PAYLOAD)
query.flags &= ~dns.flags.RD
msg, _ = dns.query.udp_with_fallback(query, server,
timeout=DEFAULT_QUERY_TIMEOUT,
ignore_unexpected=True)
return msg