-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvalidate-geofeed.py
executable file
·179 lines (142 loc) · 5.54 KB
/
validate-geofeed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
#!/usr/bin/python
#
# Copyright (c) 2012 IETF Trust and the persons identified as
# authors of the code. All rights reserved. Redistribution and use
# in source and binary forms, with or without modification, is
# permitted pursuant to, and subject to the license terms contained
# in, the Simplified BSD License set forth in Section 4.c of the
# IETF Trust's Legal Provisions Relating to IETF
# Documents (http://trustee.ietf.org/license-info).
"""Simple format validator for self-published ipgeo feeds.
This tool reads CSV data in the self-published ipgeo feed format
from the standard input and performs basic validation. It is
intended for use by feed publishers before launching a feed.
"""
import csv
import ipaddr
import re
import sys
class IPGeoFeedValidator(object):
def __init__(self):
self.prefixes = {}
self.line_number = 0
self.output_log = {}
self.SetOutputStream(sys.stderr)
def Validate(self, feed):
"""Check validity of an IPGeo feed.
Args:
feed: iterable with feed lines
"""
for line in feed:
self._ValidateLine(line)
def SetOutputStream(self, logfile):
"""Controls where the output messages go do (STDERR by default).
Use None to disable logging.
Args:
logfile: a file object (e.g., sys.stdout) or None.
"""
self.output_stream = logfile
def CountErrors(self, severity):
"""How many ERRORs or WARNINGs were generated."""
return len(self.output_log.get(severity, []))
############################################################
def _ValidateLine(self, line):
line = line.rstrip('\r\n')
self.line_number += 1
self.line = line.split('#')[0]
self.is_correct_line = True
if self._ShouldIgnoreLine(line):
return
fields = [field for field in csv.reader([line])][0]
self._ValidateFields(fields)
self._FlushOutputStream()
def _ShouldIgnoreLine(self, line):
line = line.strip()
if line.startswith('#'):
return True
return len(line) == 0
############################################################
def _ValidateFields(self, fields):
assert(len(fields) > 0)
is_correct = self._IsIPAddressOrPrefixCorrect(fields[0])
if len(fields) > 1:
if not self._IsAlpha2CodeCorrect(fields[1]):
is_correct = False
if len(fields) > 2 and not self._IsRegionCodeCorrect(fields[2]):
is_correct = False
if len(fields) != 5:
self._ReportWarning('5 fields were expected (got %d).'
% len(fields))
############################################################
def _IsIPAddressOrPrefixCorrect(self, field):
if '/' in field:
return self._IsCIDRCorrect(field)
return self._IsIPAddressCorrect(field)
def _IsCIDRCorrect(self, cidr):
try:
ipprefix = ipaddr.IPNetwork(cidr)
if ipprefix.network._ip != ipprefix._ip:
self._ReportError('Incorrect IP Network.')
return False
if ipprefix.is_private:
self._ReportError('IP Address must not be private.')
return False
except:
self._ReportError('Incorrect IP Network.')
return False
return True
def _IsIPAddressCorrect(self, ipaddress):
try:
ip = ipaddr.IPAddress(ipaddress)
except:
self._ReportError('Incorrect IP Address.')
return False
if ip.is_private:
self._ReportError('IP Address must not be private.')
return False
return True
############################################################
def _IsAlpha2CodeCorrect(self, alpha2code):
if len(alpha2code) == 0:
return True
if len(alpha2code) != 2 or not alpha2code.isalpha():
self._ReportError(
'Alpha 2 code must be in the ISO 3166-1 alpha 2 format.')
return False
return True
def _IsRegionCodeCorrect(self, region_code):
if len(region_code) == 0:
return True
if '-' not in region_code:
self._ReportError('Region code must be in ISO 3166-2 format.')
return False
parts = region_code.split('-')
if not self._IsAlpha2CodeCorrect(parts[0]):
return False
return True
############################################################
def _ReportError(self, message):
self._ReportWithSeverity('ERROR', message)
def _ReportWarning(self, message):
self._ReportWithSeverity('WARNING', message)
def _ReportWithSeverity(self, severity, message):
self.is_correct_line = False
output_line = '%s: %s\n' % (severity, message)
if severity not in self.output_log:
self.output_log[severity] = []
self.output_log[severity].append(output_line)
if self.output_stream is not None:
self.output_stream.write(output_line)
def _FlushOutputStream(self):
if self.is_correct_line: return
if self.output_stream is None: return
self.output_stream.write('line %d: %s\n\n'
% (self.line_number, self.line))
############################################################
def main():
feed_validator = IPGeoFeedValidator()
feed_validator.Validate(sys.stdin)
if feed_validator.CountErrors('ERROR'):
sys.exit(1)
if __name__ == '__main__':
main()