diff --git a/Cargo.toml b/Cargo.toml index cf46ee3..5cd5759 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,5 @@ members = [ "streamson-bin", "streamson-lib", + "streamson-tokio", ] diff --git a/README.md b/README.md index a1da248..7b0b1f6 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ A memory efficient set of tools to split large JSONs into a smaller parts. * [streamson-lib](streamson-lib/README.md) - Core Rust library * [streamson-bin](streamson-bin/README.md) - Binary to split JSONs +* [streamson-tokio](streamson-tokio/README.md) - Helpers to integrates streamson with tokio ## Motivation Imagine a situation when you get a very large JSON input. diff --git a/streamson-lib/src/error.rs b/streamson-lib/src/error.rs index c656cc4..4f79652 100644 --- a/streamson-lib/src/error.rs +++ b/streamson-lib/src/error.rs @@ -1,6 +1,6 @@ //! Module containing errors -use std::{error::Error, fmt, str::Utf8Error}; +use std::{error::Error, fmt, io, str::Utf8Error}; /// Matcher related errors #[derive(Debug, PartialEq, Clone)] @@ -68,12 +68,13 @@ impl fmt::Display for IncorrectInput { } } /// Handler related errors -#[derive(Debug, PartialEq, Clone)] +#[derive(Debug)] pub enum General { Handler(Handler), Matcher(Matcher), Utf8Error(Utf8Error), IncorrectInput(IncorrectInput), + IOError(io::Error), } impl Error for General {} @@ -84,30 +85,37 @@ impl fmt::Display for General { Self::Matcher(err) => err.fmt(f), Self::Utf8Error(err) => err.fmt(f), Self::IncorrectInput(err) => err.fmt(f), + Self::IOError(err) => err.fmt(f), } } } impl From for General { fn from(handler: Handler) -> Self { - General::Handler(handler) + Self::Handler(handler) } } impl From for General { fn from(matcher: Matcher) -> Self { - General::Matcher(matcher) + Self::Matcher(matcher) } } impl From for General { fn from(utf8: Utf8Error) -> Self { - General::Utf8Error(utf8) + Self::Utf8Error(utf8) } } impl From for General { fn from(incorrect_input: IncorrectInput) -> Self { - General::IncorrectInput(incorrect_input) + Self::IncorrectInput(incorrect_input) + } +} + +impl From for General { + fn from(io_error: io::Error) -> Self { + Self::IOError(io_error) } } diff --git a/streamson-tokio/Cargo.toml b/streamson-tokio/Cargo.toml new file mode 100644 index 0000000..66b8dd9 --- /dev/null +++ b/streamson-tokio/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "streamson-tokio" +version = "0.1.0" +authors = ["Stepan Henek"] +edition = "2018" +description = "Tokio and streamson integration library" +license = "MIT" +readme = "README.md" +keywords = ["json", "tokio"] +repository = "https://github.com/shenek/streamson" +categories = ["parsing"] + +[dependencies] +bytes = "0.5" +streamson-lib = { version = "0.1.0", path = "../streamson-lib/" } +tokio-util = { version = "0.3", features = ["codec"] } + +[dev-dependencies] +tokio = { version = "0.2", features = ["full"] } diff --git a/streamson-tokio/README.md b/streamson-tokio/README.md new file mode 100644 index 0000000..576bc29 --- /dev/null +++ b/streamson-tokio/README.md @@ -0,0 +1,22 @@ +# Streamson tokio + +A library which integrates streamson with tokio. +So that you can easily split jsons using asynchronous rust. + +## Examples +### Reading a large file +```rust + use std::io; + use streamson_lib::error; + use streamson_tokio::decoder::SimpleExtractor; + use tokio::{fs, stream::StreamExt}; + use tokio_util::codec::FramedRead; + + let mut file = fs::File::open("/tmp/large.json").await?; + let extractor = SimpleExtractor::new(vec![r#"{"users"}[]"#, r#"{"groups"}[]"#]); + let mut output = FramedRead::new(file, extractor); + while let Some(item) = output.next().await { + let (path, data) = item?; + // Do something with extracted data + } +``` diff --git a/streamson-tokio/src/decoder.rs b/streamson-tokio/src/decoder.rs new file mode 100644 index 0000000..d04b1c2 --- /dev/null +++ b/streamson-tokio/src/decoder.rs @@ -0,0 +1,132 @@ +//! Decoders which implement `tokio_util::codec::Decoder` +//! and are able to extract (path, bytes) items for AsyncRead +//! + +use bytes::{Bytes, BytesMut}; +use std::sync::{Arc, Mutex}; +use streamson_lib::{error, handler, matcher, Collector}; +use tokio_util::codec::Decoder; + +/// This struct uses `streamson_lib::matcher::Simple` +/// to decode data. +/// +/// # Examples +/// ``` +/// use std::io; +/// use streamson_lib::error; +/// use streamson_tokio::decoder::SimpleExtractor; +/// use tokio::{fs, stream::StreamExt}; +/// use tokio_util::codec::FramedRead; +/// +/// async fn process() -> Result<(), error::General> { +/// let mut file = fs::File::open("/tmp/large.json").await?; +/// let extractor = SimpleExtractor::new(vec![r#"{"users"}[]"#, r#"{"groups"}[]"#]); +/// let mut output = FramedRead::new(file, extractor); +/// while let Some(item) = output.next().await { +/// let (path, data) = item?; +/// // Do something with extracted data +/// } +/// Ok(()) +/// } +/// ``` +pub struct SimpleExtractor { + collector: Collector, + handler: Arc>, +} + +impl SimpleExtractor { + /// Creates a new `SimpleExtractor` + /// + /// # Arguments + /// * `matches` - a list of valid matches (see `streamson_lib::matcher::Simple`) + pub fn new

(matches: Vec

) -> Self + where + P: ToString, + { + // TODO limit max length and fail when reached + let handler = Arc::new(Mutex::new(handler::Buffer::new())); + let mut collector = Collector::new(); + for path_match in matches { + collector = collector.add_matcher( + Box::new(matcher::Simple::new(path_match)), + &[handler.clone()], + ); + } + Self { collector, handler } + } +} + +impl Decoder for SimpleExtractor { + type Item = (String, Bytes); + type Error = error::General; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + loop { + { + // pop if necessary + let mut handler = self.handler.lock().unwrap(); + if let Some((path, bytes)) = handler.pop() { + return Ok(Some((path, bytes))); + } + // handler is unlocked here so it can be used later withing `process` method + } + if buf.is_empty() { + // end has been reached + return Ok(None); + } + let data = buf.split_to(buf.len()); + self.collector.process(&data[..])?; + } + } +} + +#[cfg(test)] +mod tests { + use super::SimpleExtractor; + use bytes::Bytes; + use std::io::Cursor; + use tokio::stream::StreamExt; + use tokio_util::codec::FramedRead; + + #[tokio::test] + async fn basic() { + let cursor = + Cursor::new(br#"{"users": ["mike","john"], "groups": ["admin", "staff"]}"#.to_vec()); + let extractor = SimpleExtractor::new(vec![r#"{"users"}[]"#, r#"{"groups"}[]"#]); + let mut output = FramedRead::new(cursor, extractor); + + assert_eq!( + output.next().await.unwrap().unwrap(), + ( + r#"{"users"}[0]"#.to_string(), + Bytes::from_static(br#""mike""#) + ) + ); + + assert_eq!( + output.next().await.unwrap().unwrap(), + ( + r#"{"users"}[1]"#.to_string(), + Bytes::from_static(br#""john""#) + ) + ); + + assert_eq!( + output.next().await.unwrap().unwrap(), + ( + r#"{"groups"}[0]"#.to_string(), + Bytes::from_static(br#""admin""#) + ) + ); + + assert_eq!( + output.next().await.unwrap().unwrap(), + ( + r#"{"groups"}[1]"#.to_string(), + Bytes::from_static(br#""staff""#) + ) + ); + + assert!(output.next().await.is_none()); + } +} diff --git a/streamson-tokio/src/lib.rs b/streamson-tokio/src/lib.rs new file mode 100644 index 0000000..540c521 --- /dev/null +++ b/streamson-tokio/src/lib.rs @@ -0,0 +1,5 @@ +#![crate_name = "streamson_tokio"] + +//! Library which integrates `streamson-lib` into tokio + +pub mod decoder;