-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Let Utf8JsonReader process input with one complete JSON document per line #33030
Comments
Code example of workaround: open System
open System.IO
open System.Text.Json
[<EntryPoint>]
let main argv =
let options = JsonReaderOptions ()
let input = File.ReadAllBytes "bigdata.json"
let bytes = ReadOnlyMemory input
let mutable objectsSeen = 0
let mutable tokensSeen = 0
for i = 1 to 1000 do
let rec processAllLines offset =
let bytes = bytes.Span
// Advance the span to next available object
let bytes = bytes.Slice offset
let reader = Utf8JsonReader (bytes, options)
objectsSeen <- objectsSeen + 1
let mutable level = 0
let mutable cont = true
// Consume a single line that assumed to contain a single object
// reader is a ref struct which prevents usage of tail-rec
// :-(
// Not really a bug in Utf8JsonReader but rather in the F#
// story around ref structs
while cont && reader.Read () do
tokensSeen <- tokensSeen + 1
match reader.TokenType with
| JsonTokenType.StartArray
| JsonTokenType.StartObject -> level <- level + 1
| JsonTokenType.EndArray
| JsonTokenType.EndObject ->
level <- level - 1
if level = 0 then
cont <- false
| _ -> ()
let offset = offset + int reader.BytesConsumed
// After consuming the line there's still trailing non
// printable lines, consume them in order to be able to
// tell if we reached the end of input
let rec consumeEndOfLine o =
if o < input.Length && input.[o] < 32uy then
consumeEndOfLine (o + 1)
else
o
let offset = consumeEndOfLine offset
// Done?
if offset < input.Length then
processAllLines offset
processAllLines 0
printfn "Objects seen : %d" objectsSeen
printfn "Tokens seen : %d" tokensSeen
0 |
From @Int32Overflow in #36750
|
... at the very least, using |
I have a similar issue to this, and my issue may fit into this issue: I am reading UTF-8 JSON data encapsulated in JavaScript from a web server. I've created a stream that skips to the JSON data (according to a specific preamble the JSON data is prefixed by). After the JsonDocument was read, I get an exception that there is data after the object's last token. var jsonData = Encoding.UTF8.GetBytes("{\"d\":\"test\"}}}}}"); // A JSON object with additional data
// Wanted way
using var memoryStream = new MemoryStream(jsonData); // In my case this is a file stream or HTTP content stream
var document = await JsonDocument.ParseAsync(memoryStream); // FAILS!!
var jsonElement = document.RootElement;
// The "normal" way
var document = JsonDocument.Parse(jsonData.AsMemory()); // FAILS!!
var jsonElement = document.RootElement;
// Workaround:
var jsonReader = new Utf8JsonReader(jsonData.AsSpan(), isFinalBlock: true, default);
var jsonElement = (JsonElement)JsonSerializer.Deserialize<object>(ref jsonReader); In the first and second sample, I get the following exception: If I try to deserialize the object, it works fine, and additional data is ignored. The aforementioned is a considerable performance loss, as I have to read the web page and data I'm not interested in. For example, a property I appreciate any help you can provide. |
using var memoryStream = new MemoryStream(jsonData); // In my case this is a file stream or HTTP content stream` If your stream is already skipping to the start, why not have it close at the end? Change the stream to look for the end of the JSON section, and returns EOF ( // The "normal" way
var document = JsonDocument.Parse(jsonData.AsMemory()); // FAILS!! Something similar is possible here, since memory and sequence objects can take a range, allowing you to extract just that portion. |
As a workaround, I've already done this but scanning each "block" of bytes for the specific end sequence where the JSON ends have significant overhead. I have migrated from Json.NET where the aforementioned was possible without any additional overhead: using var streamReader = new StreamReader(someHttpResponseStream); // already skipped to start of JSON
using var jsonTextReader = new JsonTextReader(streamReader);
var data = await JObject.LoadAsync(jsonTextReader); I've also checked other JSON serializers which allow the same (Utf8Json, Json.NET, ServiceStack.Text). |
I think |
This issue has been marked with the When ready to submit an amended proposal, please ensure that the original post in this issue has been updated, following the API proposal template and examples as provided in the guidelines. |
Is there any progress on this feature as of today? |
I solved a similar case where I received json objects from a websocket stream, where the sender concats multiple json objects into a single websocket message. Maybe it helps: The trick here is to check for the root objects '{' token. Then skip all children (TrySkip), if that fails, the buffer contains incomplete data and needs more. If it succeeds, the payload will contain one single json object and you can pass it to another Utf8JsonReader for 'actual' parsing. private bool TryParseMessage(ref ReadOnlySequence<byte> buffer, [NotNullWhen(returnValue: true)] out WebSocketMessage? message)
{
try
{
// Bitvavo sends multiple events in one message, like: { "event" : "candle"...}{ "event" : "ticker"...}
// Split these messages by starting at the first json object and skipping all its children
// if we can't skip, we need more data, since the object is not yet fully in the buffer
// As a side-effect, we actually don't care about individual websocket frames or messages anymore, we request more raw data until we have a valid json object
var rdr = new Utf8JsonReader(buffer, isFinalBlock: false, state: default);
if (rdr.Read())
{
if (rdr.TokenType != JsonTokenType.StartObject)
{
throw new JsonException("Invalid JSON, must start with an object");
}
if (!rdr.TrySkip())
{
// Need more data
message = null;
return false;
}
ReadOnlySequence<byte> payload;
if (buffer.GetOffset(rdr.Position) >= buffer.GetOffset(buffer.End))
{
// Skipped to end of buffer and have a single message
payload = buffer;
buffer = buffer.Slice(buffer.End);
}
else
{
// Still have data in the buffer, slice of the payload and set buffer with remaining data
payload = buffer.Slice(0, rdr.Position);
buffer = buffer.Slice(rdr.Position);
}
message = _messageParser.ParseMessage(payload);
if (message != null)
{
return true;
}
// No valid message
message = new UnknownWebSocketMessage()
{
Payload = Encoding.UTF8.GetString(payload)
};
return true;
}
}
catch (JsonException)
{
// TODO: log/put in message
}
// No valid json/event, fallback to plain text
message = new UnknownWebSocketMessage()
{
Payload = Encoding.UTF8.GetString(buffer)
};
buffer = buffer.Slice(buffer.End);
return true;
} |
@remcoros I think we are working on projects with similar websocket behavior! :D Thanks for sharing that piece of code, it helped me to write my own version based on yours, my use-case was different (had to return a list of objects instead of parsing them inside the function itself) but most of the code was used. Thank you!!! P.S. Hard to know how come this isn't a core feature of System.Text.Json! :/ |
skipping means you need whole json object in memory for it to succeed. also because UTF8JsonReader itself does not support streams as input, it would be nice if you could deserialize to a sequence of objects in cases like networked streams. they could as well not be \r\n delimited, it should be doable in theory. |
I would also appreciate something similar to this. Perhaps an IAsyncEnumerable that can read a whole "root object" 1 at a time? My use cases are:
|
@TheXenocide You may be able to use JsonSerializer.DeserializeAsyncEnumerable. Here's an example use from a StackOverflow answer that sounds like your use case:
using var file = File.OpenRead(path);
var items = JsonSerializer.DeserializeAsyncEnumerable<JsonElement>(file);
await foreach (var item in items)
{
// Process JSON object
} |
That works well for well formed JSON, but for NDJson which is popular in big data pipelines and mandatory if you want to support most append only scenarios doesn't work. |
Yeah, the streams I'm reading come from append only scenarios that do not have array tokens at the beginning or end of file, though I suppose that could be provided by a relatively trivial wrapper stream if it's required. I'll see if I can find some time to try this out sometime soon. |
@remcoros, thanks for your idea of the input buffer slicing.
I am working on the WebSockets too and have encountered with the same problem; and your example helped me a lot to find a way to solve it! |
Can use using System.Buffers;
using System.IO.Pipelines;
using System.Text.Json;
var stream = new MemoryStream("{\"id\":67,\"jsonrpc\":\"2.0\"}[{\"id\":68,\"jsonrpc\":\"2.0\"},{\"id\":69,\"jsonrpc\":\"2.0\"}]{\"id\":70,\"jsonrpc\":\"2.0\"}"u8.ToArray());
var reader = PipeReader.Create(stream);
await foreach (var jsonDocument in ParseJson(reader))
{
Console.WriteLine(jsonDocument.RootElement.ToString());
jsonDocument.Dispose();
}
static async IAsyncEnumerable<JsonDocument> ParseJson(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (!buffer.IsEmpty && TryParseJson(ref buffer, out JsonDocument jsonDocument))
{
yield return jsonDocument;
}
if (result.IsCompleted)
{
break;
}
reader.AdvanceTo(buffer.Start, buffer.End);
}
reader.Complete();
}
static bool TryParseJson(ref ReadOnlySequence<byte> buffer, out JsonDocument jsonDocument)
{
var reader = new Utf8JsonReader(buffer, isFinalBlock: false, default);
if (JsonDocument.TryParseValue(ref reader, out jsonDocument))
{
buffer = buffer.Slice(reader.BytesConsumed);
return true;
}
return false;
} |
@benaadams solution worked nicely for me. In my case, I needed to read in a file stream instead of a memory stream, otherwise exactly the same. var path = ./manifest.ndjson";
await using FileStream fileStream = new(path, FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize: 4096, useAsync: true);
... If reading the fileStream synchronously, this simpler method call also works. var path = ./manifest.ndjson";
using var fileStream = File.OpenRead(path);
... |
Awesome solution! Thanks for sharing, Ben. I used it to migrate from Newtonsoft.Json to System.Text.Json in Docker.DotNet. |
I'm also interested in JSONL handling but from the writer's side. I was looking for a way to produce JSONL/NDJSON necessary for Amazon Athena (Apache Hive). |
I think it is very odd to prefer having to parse JSON twice instead of just implementing an option that doesn't throw if there is data left after parsing. All that would take is calling deserialize in a loop with options that specify the do not throw setting. Now even with a fair bit of code in some internal class still you have to go find that in dotnet/runtime and copy it wholesale into your project to get such a simple thing done. |
Just add an option that doesn't try to read everything beyond the object it's provided... |
AI-related use cases:
In both cases this can be addressed by having some API like: T nextItem = await JsonSerializer.ReadNextAsync<T>(stream); or:
@stephentoub pointed out that the |
I guess one other possible API design is like this: using var readContext = new JsonStreamReadContext(stream); // TODO: better naming
while (await JsonSerializer.ReadNextAsync<T>(readContext) is {} nextItem)
{
// ...
} ... as this would give a place to track read-but-not-yet-consumed data from the stream. Not saying it's any better than |
Here is an API proposal based on a prototype I've been working on: namespace System.Text.Json;
public partial struct JsonReaderOptions
{
public bool AllowTrailingContent { get; set; }
}
public partial class JsonSerializerOptions
{
public bool AllowTrailingContent { get; set; }
}
namespace System.Text.Json.Serialization;
public partial class JsonSourceGenerationOptionsAttribute
{
public bool AllowTrailingContent { get; set; }
} API UsageEnabling the setting endows var reader = new Utf8JsonReader("null {} 1 \r\n [1,2,3]"u8, new() { AllowTrailingContent = true });
reader.Read();
Console.WriteLine(reader.TokenType); // Null
reader.Read();
Console.WriteLine(reader.TokenType); // StartObject
reader.Skip();
reader.Read();
Console.WriteLine(reader.TokenType); // Number
reader.Read();
Console.WriteLine(reader.TokenType); // StartArray
reader.Skip();
Console.WriteLine(reader.Read()); // False This additionally makes it possible to read JSON from payloads that may contain trailing data that is invalid JSON: var reader = new Utf8JsonReader("[1,2,3] <NotJson/>"u8, new() { AllowTrailingContent = true });
reader.Read();
reader.Skip(); // Success
reader.Read(); // throws JsonReaderException The equivalent JsonSerializerOptions options = new() { AllowTrailingContent = true };
JsonSerializer.Deserializer<int[]>("[1,2,3] { }[]{}", options); // Success
JsonSerializer.Deserializer<int[]>("[1,2,3] <!NOT JSON!>", options); // Success New DeserializeAsyncEnumerable overloadsThe following APIs make it possible to stream multiple root-level JSON values using IAsyncEnumerable: namespace System.Text.Json;
+public enum JsonDeserializeAsyncEnumerableMode
+{
+ Array = 0,
+ RootLevelValues = 1,
+}
public partial static class JsonSerializer
{
public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonSerializerOptions options = null, CancellationToken cancellationToken = default);
public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonTypeInfo<T> jsonTypeInfo, CancellationToken cancellationToken = default);
+ public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonDeserializeAsyncEnumerableMode mode, JsonSerializerOptions options = null, CancellationToken cancellationToken = default);
+ public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonTypeInfo<T> jsonTypeInfo, JsonDeserializeAsyncEnumerableMode mode, CancellationToken cancellationToken = default);
} Which enables scenaria like the following: string json = """[0] [0,1] [0,1,1] [0,1,1,2] [0,1,1,2,3]""";
using var stream = new MemoryStream(Encoding.UTF8.GetBytes(json));
await foreach(int[] item in JsonSerializer.DeserializeAsyncEnumerable<int[]>(stream, JsonDeserializeAsyncEnumerableMode.RootLevelValues))
{
Console.WriteLine(item.Length);
} |
Thanks, that looks pretty good. For the JsonDeserializeAsyncEnumerableMode, would we imagine a future where you could opt-in to both RootLevelValue and Array, i.e. it'd yield top-level objects but if they were arrays it would instead enumerate their contents? I don't know how valuable that would be, but it would impact the shape of the enum. |
Probably not, and that's because of the ambiguity arising from the case where the element types themselves serialize as arrays. E.g. it would be unclear if |
namespace System.Text.Json
{
public partial struct JsonReaderOptions
{
public bool AllowMultipleValues { get; set; }
}
public partial static class JsonSerializer
{
public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, bool topLevelValues, JsonSerializerOptions options = null, CancellationToken cancellationToken = default);
public static IAsyncEnumerable<T> DeserializeAsyncEnumerable<T>(Stream utf8Json, JsonTypeInfo<T> jsonTypeInfo, bool topLevelValues, CancellationToken cancellationToken = default);
}
} |
EDIT see #33030 (comment) for an API proposal
Apologies if this ticket has been created but I missed it or there's a flag that I missed. I looked around but didn't find a good match.
I work in BigData and a common scenario is that we have large files that contains JSON documents separated by newline like so:
This is not a valid JSON document obviously but it would help me a lot if the
Utf8JsonReader
could be configured to process a sequence of JSON documents like this.My alternatives are to read line by line (forcing me to parse the data and thus losing performance) or adding extra complexity when iterating over the objects so I can keep track when an object is done and I should continue to the next.
Neither is very attractive to me and IMHO since this is a quite common scenario it would make some sense to add it, especially since the parser support relaxed parsing wrt to trailing commas and comments
The text was updated successfully, but these errors were encountered: