-
Notifications
You must be signed in to change notification settings - Fork 22
/
udp.rb
161 lines (133 loc) · 4.79 KB
/
udp.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# encoding: utf-8
require "date"
require "logstash/inputs/base"
require "logstash/namespace"
require "socket"
require "stud/interval"
require "ipaddr"
# Read messages as events over the network via udp. The only required
# configuration item is `port`, which specifies the udp port logstash
# will listen on for event streams.
#
class LogStash::Inputs::Udp < LogStash::Inputs::Base
config_name "udp"
default :codec, "plain"
# The address which logstash will listen on.
config :host, :validate => :string, :default => "0.0.0.0"
# The port which logstash will listen on. Remember that ports less
# than 1024 (privileged ports) may require root or elevated privileges to use.
config :port, :validate => :number, :required => true
# The maximum packet size to read from the network
config :buffer_size, :validate => :number, :default => 65536
# The socket receive buffer size in bytes.
# If option is not set, the operating system default is used.
# The operating system will use the max allowed value if receive_buffer_bytes is larger than allowed.
# Consult your operating system documentation if you need to increase this max allowed value.
config :receive_buffer_bytes, :validate => :number
# Number of threads processing packets
config :workers, :validate => :number, :default => 2
# This is the number of unprocessed UDP packets you can hold in memory
# before packets will start dropping.
config :queue_size, :validate => :number, :default => 2000
HOST_FIELD = "host".freeze
def initialize(params)
super
BasicSocket.do_not_reverse_lookup = true
end
def register
@udp = nil
@metric_errors = metric.namespace(:errors)
end # def register
def run(output_queue)
@output_queue = output_queue
begin
# udp server
udp_listener(output_queue)
rescue => e
@logger.warn("UDP listener died", :exception => e, :backtrace => e.backtrace)
@metric_errors.increment(:listener)
Stud.stoppable_sleep(5) { stop? }
retry unless stop?
end
end
def close
if @udp && !@udp.closed?
@udp.close rescue ignore_close_and_log($!)
end
end
def stop
if @udp && !@udp.closed?
@udp.close rescue ignore_close_and_log($!)
end
end
private
def udp_listener(output_queue)
@logger.info("Starting UDP listener", :address => "#{@host}:#{@port}")
if @udp && !@udp.closed?
@udp.close
end
if IPAddr.new(@host).ipv6?
@udp = UDPSocket.new(Socket::AF_INET6)
elsif IPAddr.new(@host).ipv4?
@udp = UDPSocket.new(Socket::AF_INET)
end
# set socket receive buffer size if configured
if @receive_buffer_bytes
@udp.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, @receive_buffer_bytes)
end
rcvbuf = @udp.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).unpack("i")[0]
if @receive_buffer_bytes && rcvbuf != @receive_buffer_bytes
@logger.warn("Unable to set receive_buffer_bytes to desired size. Requested #{@receive_buffer_bytes} but obtained #{rcvbuf} bytes.")
end
@udp.bind(@host, @port)
@logger.info("UDP listener started", :address => "#{@host}:#{@port}", :receive_buffer_bytes => "#{rcvbuf}", :queue_size => "#{@queue_size}")
@input_to_worker = SizedQueue.new(@queue_size)
metric.gauge(:queue_size, @queue_size)
metric.gauge(:workers, @workers)
@input_workers = @workers.times do |i|
@logger.debug("Starting UDP worker thread", :worker => i)
Thread.new(i, @codec.clone) { |i, codec| inputworker(i, codec) }
end
while !stop?
next if IO.select([@udp], [], [], 0.5).nil?
# collect datagram messages and add to inputworker queue
@queue_size.times do
begin
payload, client = @udp.recvfrom_nonblock(@buffer_size)
break if payload.empty?
@input_to_worker.push([payload, client])
rescue IO::EAGAINWaitReadable
break
end
end
end
ensure
if @udp
@udp.close_read rescue ignore_close_and_log($!)
@udp.close_write rescue ignore_close_and_log($!)
end
end
def inputworker(number, codec)
LogStash::Util::set_thread_name("<udp.#{number}")
begin
while true
payload, client = @input_to_worker.pop
host = client[3]
codec.decode(payload) { |event| push_decoded_event(host, event) }
codec.flush { |event| push_decoded_event(host, event) }
end
rescue => e
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
@metric_errors.increment(:worker)
end
end
def push_decoded_event(host, event)
decorate(event)
event.set(HOST_FIELD, host) if event.get(HOST_FIELD).nil?
@output_queue.push(event)
metric.increment(:events)
end
def ignore_close_and_log(e)
@logger.debug("ignoring close exception", "exception" => e)
end
end