Skip to content

Commit

Permalink
tokio: streamson-tokio workspace added
Browse files Browse the repository at this point in the history
* contains only a simple tokio_utils::codec::Decoder implementation
  • Loading branch information
shenek committed May 27, 2020
1 parent 269c870 commit 9f9a3f3
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
members = [
"streamson-bin",
"streamson-lib",
"streamson-tokio",
]
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ A memory efficient set of tools to split large JSONs into a smaller parts.

* [streamson-lib](streamson-lib/README.md) - Core Rust library
* [streamson-bin](streamson-bin/README.md) - Binary to split JSONs
* [streamson-tokio](streamson-tokio/README.md) - Helpers to integrates streamson with tokio

## Motivation
Imagine a situation when you get a very large JSON input.
Expand Down
20 changes: 14 additions & 6 deletions streamson-lib/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Module containing errors
use std::{error::Error, fmt, str::Utf8Error};
use std::{error::Error, fmt, io, str::Utf8Error};

/// Matcher related errors
#[derive(Debug, PartialEq, Clone)]
Expand Down Expand Up @@ -68,12 +68,13 @@ impl fmt::Display for IncorrectInput {
}
}
/// Handler related errors
#[derive(Debug, PartialEq, Clone)]
#[derive(Debug)]
pub enum General {
Handler(Handler),
Matcher(Matcher),
Utf8Error(Utf8Error),
IncorrectInput(IncorrectInput),
IOError(io::Error),
}

impl Error for General {}
Expand All @@ -84,30 +85,37 @@ impl fmt::Display for General {
Self::Matcher(err) => err.fmt(f),
Self::Utf8Error(err) => err.fmt(f),
Self::IncorrectInput(err) => err.fmt(f),
Self::IOError(err) => err.fmt(f),
}
}
}

impl From<Handler> for General {
fn from(handler: Handler) -> Self {
General::Handler(handler)
Self::Handler(handler)
}
}

impl From<Matcher> for General {
fn from(matcher: Matcher) -> Self {
General::Matcher(matcher)
Self::Matcher(matcher)
}
}

impl From<Utf8Error> for General {
fn from(utf8: Utf8Error) -> Self {
General::Utf8Error(utf8)
Self::Utf8Error(utf8)
}
}

impl From<IncorrectInput> for General {
fn from(incorrect_input: IncorrectInput) -> Self {
General::IncorrectInput(incorrect_input)
Self::IncorrectInput(incorrect_input)
}
}

impl From<io::Error> for General {
fn from(io_error: io::Error) -> Self {
Self::IOError(io_error)
}
}
19 changes: 19 additions & 0 deletions streamson-tokio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "streamson-tokio"
version = "0.1.0"
authors = ["Stepan Henek"]
edition = "2018"
description = "Tokio and streamson integration library"
license = "MIT"
readme = "README.md"
keywords = ["json", "tokio"]
repository = "https://github.com/shenek/streamson"
categories = ["parsing"]

[dependencies]
bytes = "0.5"
streamson-lib = { version = "0.1.0", path = "../streamson-lib/" }
tokio-util = { version = "0.3", features = ["codec"] }

[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }
22 changes: 22 additions & 0 deletions streamson-tokio/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Streamson tokio

A library which integrates streamson with tokio.
So that you can easily split jsons using asynchronous rust.

## Examples
### Reading a large file
```rust
use std::io;
use streamson_lib::error;
use streamson_tokio::decoder::SimpleExtractor;
use tokio::{fs, stream::StreamExt};
use tokio_util::codec::FramedRead;

let mut file = fs::File::open("/tmp/large.json").await?;
let extractor = SimpleExtractor::new(vec![r#"{"users"}[]"#, r#"{"groups"}[]"#]);
let mut output = FramedRead::new(file, extractor);
while let Some(item) = output.next().await {
let (path, data) = item?;
// Do something with extracted data
}
```
132 changes: 132 additions & 0 deletions streamson-tokio/src/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//! Decoders which implement `tokio_util::codec::Decoder`
//! and are able to extract (path, bytes) items for AsyncRead
//!
use bytes::{Bytes, BytesMut};
use std::sync::{Arc, Mutex};
use streamson_lib::{error, handler, matcher, Collector};
use tokio_util::codec::Decoder;

/// This struct uses `streamson_lib::matcher::Simple`
/// to decode data.
///
/// # Examples
/// ```
/// use std::io;
/// use streamson_lib::error;
/// use streamson_tokio::decoder::SimpleExtractor;
/// use tokio::{fs, stream::StreamExt};
/// use tokio_util::codec::FramedRead;
///
/// async fn process() -> Result<(), error::General> {
/// let mut file = fs::File::open("/tmp/large.json").await?;
/// let extractor = SimpleExtractor::new(vec![r#"{"users"}[]"#, r#"{"groups"}[]"#]);
/// let mut output = FramedRead::new(file, extractor);
/// while let Some(item) = output.next().await {
/// let (path, data) = item?;
/// // Do something with extracted data
/// }
/// Ok(())
/// }
/// ```
pub struct SimpleExtractor {
collector: Collector,
handler: Arc<Mutex<handler::Buffer>>,
}

impl SimpleExtractor {
/// Creates a new `SimpleExtractor`
///
/// # Arguments
/// * `matches` - a list of valid matches (see `streamson_lib::matcher::Simple`)
pub fn new<P>(matches: Vec<P>) -> Self
where
P: ToString,
{
// TODO limit max length and fail when reached
let handler = Arc::new(Mutex::new(handler::Buffer::new()));
let mut collector = Collector::new();
for path_match in matches {
collector = collector.add_matcher(
Box::new(matcher::Simple::new(path_match)),
&[handler.clone()],
);
}
Self { collector, handler }
}
}

impl Decoder for SimpleExtractor {
type Item = (String, Bytes);
type Error = error::General;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
loop {
{
// pop if necessary
let mut handler = self.handler.lock().unwrap();
if let Some((path, bytes)) = handler.pop() {
return Ok(Some((path, bytes)));
}
// handler is unlocked here so it can be used later withing `process` method
}
if buf.is_empty() {
// end has been reached
return Ok(None);
}
let data = buf.split_to(buf.len());
self.collector.process(&data[..])?;
}
}
}

#[cfg(test)]
mod tests {
use super::SimpleExtractor;
use bytes::Bytes;
use std::io::Cursor;
use tokio::stream::StreamExt;
use tokio_util::codec::FramedRead;

#[tokio::test]
async fn basic() {
let cursor =
Cursor::new(br#"{"users": ["mike","john"], "groups": ["admin", "staff"]}"#.to_vec());
let extractor = SimpleExtractor::new(vec![r#"{"users"}[]"#, r#"{"groups"}[]"#]);
let mut output = FramedRead::new(cursor, extractor);

assert_eq!(
output.next().await.unwrap().unwrap(),
(
r#"{"users"}[0]"#.to_string(),
Bytes::from_static(br#""mike""#)
)
);

assert_eq!(
output.next().await.unwrap().unwrap(),
(
r#"{"users"}[1]"#.to_string(),
Bytes::from_static(br#""john""#)
)
);

assert_eq!(
output.next().await.unwrap().unwrap(),
(
r#"{"groups"}[0]"#.to_string(),
Bytes::from_static(br#""admin""#)
)
);

assert_eq!(
output.next().await.unwrap().unwrap(),
(
r#"{"groups"}[1]"#.to_string(),
Bytes::from_static(br#""staff""#)
)
);

assert!(output.next().await.is_none());
}
}
5 changes: 5 additions & 0 deletions streamson-tokio/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#![crate_name = "streamson_tokio"]

//! Library which integrates `streamson-lib` into tokio
pub mod decoder;

0 comments on commit 9f9a3f3

Please sign in to comment.