-
Notifications
You must be signed in to change notification settings - Fork 55
/
Copy pathutf8_sanitizer.rb
292 lines (246 loc) · 8.45 KB
/
utf8_sanitizer.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# encoding: ascii-8bit
# frozen_string_literal: true
require 'uri'
require 'stringio'
require 'rack/request'
module Rack
class UTF8Sanitizer
StringIO = ::StringIO
NULL_BYTE_REGEX = /\x00/.freeze
class NullByteInString < StandardError; end
# options[:sanitizable_content_types] Array
# options[:additional_content_types] Array
def initialize(app, options={})
@app = app
@strategy = build_strategy(options)
@sanitizable_content_types = options[:sanitizable_content_types]
@sanitizable_content_types ||= SANITIZABLE_CONTENT_TYPES + (options[:additional_content_types] || [])
@only = Array(options[:only]).flatten
@except = Array(options[:except]).flatten
@sanitize_null_bytes = options.fetch(:sanitize_null_bytes, false)
end
def call(env)
begin
env = sanitize(env)
rescue EOFError
return [400, { "Content-Type" => "text/plain" }, ["Bad Request"]]
end
@app.call(env)
end
DEFAULT_STRATEGIES = {
replace: lambda do |input, sanitize_null_bytes: false|
input.
force_encoding(Encoding::ASCII_8BIT).
encode!(Encoding::UTF_8,
invalid: :replace,
undef: :replace)
if sanitize_null_bytes
input = input.gsub(NULL_BYTE_REGEX, "")
end
input
end,
exception: lambda do |input, sanitize_null_bytes: false|
input.
force_encoding(Encoding::ASCII_8BIT).
encode!(Encoding::UTF_8)
if sanitize_null_bytes && NULL_BYTE_REGEX.match?(input)
raise NullByteInString
end
input
end
}.freeze
# https://github.com/rack/rack/blob/main/SPEC.rdoc
URI_FIELDS = %w(
SCRIPT_NAME
REQUEST_PATH REQUEST_URI PATH_INFO
QUERY_STRING
HTTP_REFERER
ORIGINAL_FULLPATH
ORIGINAL_SCRIPT_NAME
SERVER_NAME
).freeze
SANITIZABLE_CONTENT_TYPES = %w(
text/plain
application/x-www-form-urlencoded
application/json
text/javascript
).freeze
URI_ENCODED_CONTENT_TYPES = %w(
application/x-www-form-urlencoded
).freeze
HTTP_ = 'HTTP_'
def sanitize(env)
sanitize_rack_input(env)
sanitize_cookies(env)
env.each do |key, value|
next if skip?(key)
if URI_FIELDS.include?(key)
env[key] = transfer_frozen(value,
sanitize_uri_encoded_string(value))
elsif key.to_s.start_with?(HTTP_)
# Just sanitize the headers and leave them in UTF-8. There is
# no reason to have UTF-8 in headers, but if it's valid, let it be.
env[key] = transfer_frozen(value,
sanitize_string(value))
end
end
end
protected
def skip?(rack_env_key)
return true if !@except.empty? && @except.any? { |matcher| rack_env_key[matcher] }
return true if !@only.empty? && @only.none? { |matcher| rack_env_key[matcher] }
false
end
def build_strategy(options)
strategy = options.fetch(:strategy) { :replace }
return strategy unless DEFAULT_STRATEGIES.key?(strategy)
DEFAULT_STRATEGIES[strategy]
end
def sanitize_rack_input(env)
request = Rack::Request.new(env)
content_type = request.media_type
return unless @sanitizable_content_types.any? {|type| content_type == type }
charset = request.content_charset
return if charset && charset.downcase != 'utf-8'
uri_encoded = URI_ENCODED_CONTENT_TYPES.any? {|type| content_type == type}
if env['rack.input']
sanitized_input = sanitize_io(env['rack.input'], uri_encoded, env['CONTENT_LENGTH']&.to_i)
env['rack.input'] = sanitized_input
env['CONTENT_LENGTH'] &&= sanitized_input.size.to_s
end
end
# Modeled after Rack::RewindableInput
# TODO: Should this delegate any methods to the original io?
class SanitizedRackInput
def initialize(original_io, sanitized_io)
@original_io = original_io
@sanitized_io = sanitized_io
end
def gets
@sanitized_io.gets
end
def read(*args)
@sanitized_io.read(*args)
end
def each(&block)
@sanitized_io.each(&block)
end
def rewind
@sanitized_io.rewind
end
def size
# StringIO#size is bytesize
@sanitized_io.size
end
def close
@sanitized_io.close
@original_io.close if @original_io.respond_to?(:close)
end
end
def sanitize_io(io, uri_encoded = false, content_length = nil)
input = if content_length && content_length >= 0
io.read(content_length)
else
io.read
end
sanitized_input = sanitize_string(strip_byte_order_mark(input))
if uri_encoded
sanitized_input = sanitize_uri_encoded_string(sanitized_input).
force_encoding(Encoding::UTF_8)
end
sanitized_input = transfer_frozen(input, sanitized_input)
SanitizedRackInput.new(io, StringIO.new(sanitized_input))
end
# Cookies need to be split and then sanitized as url encoded strings
# since the cookie string itself is not url encoded (separated by `;`),
# and the normal method of `sanitize_uri_encoded_string` would break
# later cookie parsing in the case that a cookie value contained an
# encoded `;`.
def sanitize_cookies(env)
return unless env['HTTP_COOKIE']
env['HTTP_COOKIE'] = env['HTTP_COOKIE']
.split(/[;,] */n)
.map { |cookie| sanitize_uri_encoded_string(cookie) }
.join('; ')
end
# URI.encode/decode expect the input to be in ASCII-8BIT.
# However, there could be invalid UTF-8 characters both in
# raw and percent-encoded form.
#
# So, first sanitize the value, then percent-decode it while
# treating as UTF-8, then sanitize the result and encode it back.
#
# The result is guaranteed to be UTF-8-safe.
def sanitize_uri_encoded_string(input)
return input if input.nil?
decoded_value = decode_string(input)
reencode_string(decoded_value)
end
def reencode_string(decoded_value)
escape_unreserved(
sanitize_string(decoded_value))
end
def decode_string(input)
unescape_unreserved(
sanitize_string(input).
force_encoding(Encoding::ASCII_8BIT))
end
# This regexp matches all 'unreserved' characters from RFC3986 (2.3),
# plus all multibyte UTF-8 characters.
UNRESERVED_OR_UTF8 = /[A-Za-z0-9\-._~\x80-\xFF]/.freeze
UNRESERVED_OR_UTF8_OR_NULL = /[A-Za-z0-9\-._~\x00\x80-\xFF]/.freeze
# RFC3986, 2.2 states that the characters from 'reserved' group must be
# protected during normalization (which is what UTF8Sanitizer does).
#
# However, the regexp approach used by URI.unescape is not sophisticated
# enough for our task.
def unescape_unreserved(input)
input.gsub(/%([a-f\d]{2})/i) do |encoded|
decoded = $1.hex.chr
decodable_regex = @sanitize_null_bytes ? UNRESERVED_OR_UTF8_OR_NULL : UNRESERVED_OR_UTF8
if decoded =~ decodable_regex
decoded
else
encoded
end
end
end
# This regexp matches unsafe characters, i.e. everything except 'reserved'
# and 'unreserved' characters from RFC3986 (2.3), and additionally '%',
# as percent-encoded unreserved characters could be left over from the
# `unescape_unreserved` invocation.
#
# See also URI::REGEXP::PATTERN::{UNRESERVED,RESERVED}.
UNSAFE = /[^\-_.!~*'()a-zA-Z\d;\/?:@&=+$,\[\]%]/
# Performs the reverse function of `unescape_unreserved`. Unlike
# the previous function, we can reuse the logic in URI#encode
def escape_unreserved(input)
URI::DEFAULT_PARSER.escape(input, UNSAFE)
end
def sanitize_string(input)
if input.is_a? String
input = input.dup.force_encoding(Encoding::UTF_8)
if input.valid_encoding? && !(@sanitize_null_bytes && input =~ NULL_BYTE_REGEX)
input
else
@strategy.call(input, sanitize_null_bytes: @sanitize_null_bytes)
end
else
input
end
end
def transfer_frozen(from, to)
if from.frozen?
to.freeze
else
to
end
end
UTF8_BOM = "\xef\xbb\xbf".dup.force_encoding(Encoding::BINARY).freeze
UTF8_BOM_SIZE = UTF8_BOM.bytesize
def strip_byte_order_mark(input)
return input unless input.start_with?(UTF8_BOM)
input.byteslice(UTF8_BOM_SIZE..-1)
end
end
end