-
Notifications
You must be signed in to change notification settings - Fork 216
Streaming
To handle cases where a large POST body must be processed (ex: file upload), or if you want to run some validation on the supplied request headers prior to processing the rest of the request, you can define on_headers(env, headers)
and on_body(env, chunk)
methods within your API which will invoked when the headers are first parsed, or as new data arrives to the server.
require 'goliath'
class Stream < Goliath::API
def on_headers(env, headers)
env.logger.info 'received headers: ' + headers.inspect
env['async-headers'] = headers
end
def on_body(env, data)
env.logger.info 'received data: ' + data
(env['async-body'] ||= '') << data
end
def on_close(env)
env.logger.info 'closing connection'
end
def response(env)
[200, {}, {body: env['async-body'], head: env['async-headers']}]
end
end
When the on_body
callback is defined within the API, the data is streamed to the application and must be processed immediately (in example above, we store it and echo it back) – response(env)
is invoked when all the data arrives and HTTP request is complete, but params
won’t contain the body (we want to avoid buffering large files within the server). It is up to you to define the desired behavior: you can either process the data in chunks, or you can stream the data and process it at the end of the request.
Need to provide a firehose of updates to your clients? Goliath can efficiently stream the data to your clients:
require 'goliath'
class Stream < Goliath::API
def response(env)
i = 0
pt = EM.add_periodic_timer(1) do
env.stream_send("#{i}\n")
i += 1
end
EM.add_timer(10) do
pt.cancel
env.stream_send("!! BOOM !!\n")
env.stream_close
end
[200, {}, Goliath::Response::STREAMING]
end
end
In the example above, when the client connects, the server will return a 200 code and a special Goliath::Response::STREAMING
response which will indicate to the server that it should not invoke any body post-processing after it sends the response headers.
Once the response header is sent, the connection is kept open and your callbacks have direct access to the underlying connection via the env.stream_send
function. In the example above, we setup a periodic 1 second timer, which emits a counter, and a second timer (10 s) which closes the connection.
Hence, when the client connects, it will see the response headers, and then receive 10 messages (1 through 10), followed by a “!! Boom !!”. Of course, instead of sending individual integers, you can stream JSON, XML, or any other format your application requires.
You can also stream media using Chunked Transfer Encoding. The code is only subtly different:
require 'goliath'
class ChunkedStreaming < Goliath::API
def on_close(env)
env.logger.info "Connection closed."
end
def response(env)
i = 0
pt = EM.add_periodic_timer(1) do
env.chunked_stream_send("#{i}\n")
i += 1
end
EM.add_timer(10) do
pt.cancel
env.chunked_stream_send("!! BOOM !!\n")
env.chunked_stream_close
end
headers = { 'Content-Type' => 'text/plain', 'X-Stream' => 'Goliath' }
chunked_streaming_response(200, headers)
end
end
Goliath automatically adds the necessary Transfer Encoding headers and Byte size delimiters to the data stream, allowing the recipient to properly identify data chunks and handle them accordingly.