-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eventsource cannot handle large payloads #20
Comments
This is my current idea for a minimal fix, but need to test it against a few prod systems over the next few days to see if that's correct. Gist of the idea is to introduce a cursor to prevent re-reading chunks. |
This was brought to our attention recently via a support request. I believe your description is accurate, and I understand it now better than I did the original report. I'm not sure I understand this part, though:
By "response" do you mean a single SSE event? The stream connection normally remains open, so all downloaded data from LaunchDarkly makes up a single HTTP response. But it is true that the first SSE event sent on the stream (regardless of whether it's the Ruby SDK or another SDK) contains the full flag data. You also mentioned "large clients" which was a bit confusing, but I'm guessing that what you meant there was "large LaunchDarkly data sets". |
Yeah, that was poorly worded, let me try that one again. From observation I believe that the initial flag data provided to If that was a single message, non-streamed/chunked, it'd be a bit expensive to deserialize that much JSON. When the response is streamed and chunked there won't be a record break ( The experiment I'm working on right now introduces a cursor to not traverse already read chunks as a quicker fix. In the initial message I'd used this example, where every buffer = ''
buffer.index(/[\r\n]/)
buffer = '-'
buffer.index(/[\r\n]/)
buffer = '--'
buffer.index(/[\r\n]/)
# ...
buffer = '-' * 150
buffer.index(/[\r\n]/) The fix tries to do this instead: buffer = ''
buffer.index(/[\r\n]/)
previous_buffer_size = buffer.size
buffer = '-'
buffer.index(/[\r\n]/, previous_buffer_size)
previous_buffer_size = buffer.size
buffer = '--'
buffer.index(/[\r\n]/, previous_buffer_size)
# ...
previous_buffer_size = buffer.size
buffer = '-' * 150
buffer.index(/[\r\n]/, previous_buffer_size) ...so it'll start scanning where the last To be fair this had me scratching my head for a good week or so trying to figure what was up, and it was hard to think of a good way to approach or fix it, so it took a bit to write this ticket. Sorry about the delay there. |
I'd run some isolated tests to see if I could see just how much impact that chunk = '-' * 1000
chunks = [*(([chunk] * 999) * 150), chunk + "\n"]
chunks.join.bytesize.fdiv 10**6 # 1 mil
# => 149.851001 (Mb)
# Full scan
Benchmark.measure do
start_time = Time.now
previous_time = start_time
buffer = ''
chunks.each_with_index do |c, i|
if i % 1000 == 0
current_time = Time.now - start_time.to_f
puts "i: #{i}, delta_time: #{(current_time - previous_time).round(4)}, time: #{current_time.to_f.round(4)}"
previous_time = current_time
end
buffer << c
break true if buffer.index("\n")
end
end
# i: 1000, delta_time: 0.0144, time: 0.0144
# i: 2000, delta_time: 0.0388, time: 0.0533
# i: 3000, delta_time: 0.051, time: 0.1043
# ...
# i: 43000, delta_time: 2.4448, time: 46.5457
# i: 44000, delta_time: 2.4821, time: 49.0279
# i: 45000, delta_time: 2.624, time: 51.6519
# ...
# i: 147000, delta_time: 9.4535, time: 648.7023
# i: 148000, delta_time: 9.5025, time: 658.2048
# i: 149000, delta_time: 9.0586, time: 667.2634
# Positional scan
Benchmark.measure do
start_time = Time.now
previous_time = start_time
buffer = ''
chunks.each_with_index do |c, i|
if i % 1000 == 0
current_time = Time.now - start_time.to_f
puts "i: #{i}, delta_time: #{(current_time - previous_time).round(4)}, time: #{current_time.to_f.round(4)}"
previous_time = current_time
end
pos = buffer.size
buffer << c
break true if buffer.index("\n", pos)
end
end
# i: 142000, delta_time: 0.0007, time: 0.1223
# i: 143000, delta_time: 0.0007, time: 0.123
# i: 144000, delta_time: 0.0008, time: 0.1238
# i: 145000, delta_time: 0.0007, time: 0.1245
# i: 146000, delta_time: 0.0007, time: 0.1252
# i: 147000, delta_time: 0.0007, time: 0.1259
# i: 148000, delta_time: 0.0007, time: 0.1266
# i: 149000, delta_time: 0.0007, time: 0.1274
667.2634 / 0.1274
# => 5218.621638954628 (times faster) This looks to be a viable fix to try, and I'll test it with our production servers on Monday. Any thoughts? |
It looks plausible at first glance, we'll just need to review it in more detail. |
Looking at the branch with your proposed fix, there's one thing I'm not sure is right. It's here, where you are moving It seems to me that a more logical place to do this would be within Either way, I think there's an edge case where it could end up being wrong— but it's an edge case that I think the existing code is already vulnerable to. SSE allows the line break to be (Edge cases like that are a common pitfall in writing an SSE implementation; I'm pretty sure we ran into this one before on another platform, but missed it in Ruby. For that reason, we're looking into implementing a standardized set of acceptance/contract tests to be run against all of our SSE implementations, separate from the higher-level SDK testing.) |
I'm also not sure that our use of |
Yes, and to be clear these are mostly theoretical changes that look decent on paper. I would have to vet out edge cases as well and you know far more than me on how SSEs work. Still intending on giving this a test, but other projects have been demanding attention lately. I believe the larger change, which has a much larger footprint, would be around sending the entire payload as one SSE versus potential sharding patterns for it, or filtering on the server side before they're even sent down. My fear here is that we won't be the only clients with that large of a payload in the future, so it may be a discussion worth having now. |
Sending the payload as multiple events would be a much, much larger change, incompatible with all our existing SDKs and affecting multiple layers of the service infrastructure. It's unlikely that that will happen in the foreseeable future. Adding some kind of filtering mechanism is more feasible and it's been requested as a feature before. |
Agreed that sharding payloads is a major breaking change. Server-side filtering would be far more likely and immediately useful, and if a change like this works it'll buy some more time to address it as an option. I know we've written filtering at a client-side level for data stores, but ideally the filtering is done on the server side before payloads are sent. |
We've actually added filtering (at the flag data store interface level) in our Go and Java implementations, so that apps suffering from memory ballooning due to LD can explicitly specify the list of flags (or flag name prefixes) they care about. Everything is still downloaded initially, and streamed, but only the specified flags are kept. Our ideal client SDK would probably look something like this:
Segments would presumable be handled the same way as flags, with the possible optimization of including any segments mentioned in a wanted flag. The big question would be whether keeping the flag list per connection in Relay would be expensive. If it's a problem, some sort of copy-on-write/coalesce-on-collision system could allow all 50 instances of the same app to (eventually) share the same list of flags. |
If you're going to filter watched flags, I would love if it could be filtered by tag. I feel like prefixes is more of a workaround for what tags can do more effectively. |
We might spin those discussions off into a separate ticket, don't want to overload this too much. Also I added a Ruby implementation for local filtering, so that exists too @zellyn. |
Filtering would be a general feature request in the entire product, not specific to the Ruby SDK. And it's a request we've received before, so there's no need to file it again; I'll just make a note that there's more interest in it. Let's table that here, so that this GitHub issue on (Although - @zellyn, I do want to address one thing you mentioned above, as something that would probably not be in any future filtering implementation: the idea that if the client needed a flag it had not previously received, it would fetch the flag from LD or from Relay at that time. We've deliberately never implemented anything in the SDKs that would involve making HTTP requests to the service endpoints on demand during a flag evaluation— that is just too much latency and unpredictability. The SDKs do all of their HTTP dialogue in the background, updating their internal state accordingly, so that flag evaluations can simply reference that internal state. The only exception is that in server-side SDKs which have database integrations, some of that state might need to be requeried from a database if it's not currently cached.) |
@baweaver We're reviewing a PR now that has a number of improvements to stream reading, including the one you suggested. We should be able to release the fix this week. |
Absolutely. Our internal home-built feature flag system works exactly the same way: all evaluations are local, based on asynchronously updated rules that can be evaluated. I don't think one synchronous evaluation for a flag would hurt, although it would also be acceptable to return the default, log a warning (so that clients can add the flag to the list of initially-requested flags), and fetch the value asynchronously.
Indeed, it is seeming more and more likely that it is going to become a requirement for Square to be able to successfully use LaunchDarkly. |
@baweaver The changes in that PR have been released and the improved SSE client is now in the 6.2.5 release of the Ruby SDK. Please let us know if you see improvements in performance after updating to this version. |
Closing this because implementation changes in the last several releases have made it obsolete. |
Brief Summary
Ruby Server SDK calls Eventsource with large payloads of JSON data, causing Rails Puma workers to timeout on the default 90s timeout. We have identified likely causes which will be highlighted below.
Buffer Indexing
The code for handling streaming responses chunks on record breaks (
/\r\n/
approx), but uses a mechanism which can lead to exponential growth on largedata
rows:Specifically this line is worrying:
When LD is initialized I believe that it tries to send the entirety of the payload in one line of data, but server responses are chunked streams. If the full response is 150Mb (not theoretical) and the chunk size is say 1Mb (guessing for example) that means it won't hit a record-break until 150Mb.
In other words, it does this (
-
is one chunk):Repeat until you get to 150Mb from what I think might be happening and you've done a substantial number of reads over the same data.
The problem is that eventsource is reading the entirety of the buffer and storing the entirety of the response, rather than checking for record breaks in each chunk.
Stream Init
Why does that become an issue? I believe this chunk of code is relevant:
...and that won't be triggered by
conn.on_event { |event| process_message(event) }
(src) until that entire JSON payload is processed from chunks.Summary
We believe that the Ruby Server SDK is trying to retrieve the entirety of the flag data as a singular response from Ruby Eventsource, and for large clients (150Mb+) this will cause crashes.
Current experimentation I've been working on has been to prevent full reads of a partial buffer, but I believe this reflects more on the entirety of flag data being sent in a single message. If it truly is all being sent in a single response this will be a severe issue for larger clients.
The text was updated successfully, but these errors were encountered: