From 0e419322dba45f81f20a71f160eabbd2bfe12c3f Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 7 Apr 2024 16:37:16 -0300 Subject: [PATCH] feat: implement `IntoJson` filter (#768) --- examples/into_json/.gitignore | 1 + examples/into_json/daemon.toml | 24 ++++++++++++++++++++++++ src/filters/{json.rs => into_json.rs} | 5 +++-- src/filters/mod.rs | 14 +++++++------- 4 files changed, 35 insertions(+), 9 deletions(-) create mode 100644 examples/into_json/.gitignore create mode 100644 examples/into_json/daemon.toml rename src/filters/{json.rs => into_json.rs} (79%) diff --git a/examples/into_json/.gitignore b/examples/into_json/.gitignore new file mode 100644 index 00000000..6caf68af --- /dev/null +++ b/examples/into_json/.gitignore @@ -0,0 +1 @@ +output \ No newline at end of file diff --git a/examples/into_json/daemon.toml b/examples/into_json/daemon.toml new file mode 100644 index 00000000..415407ec --- /dev/null +++ b/examples/into_json/daemon.toml @@ -0,0 +1,24 @@ +[source] +type = "N2N" +peers = ["relays-new.cardano-mainnet.iohk.io:3001"] + +[intersect] +type = "Point" +value = [4493860, "ce7f821d2140419fea1a7900cf71b0c0a0e94afbb1f814a6717cff071c3b6afc"] + +[[filters]] +type = "SplitBlock" + +[[filters]] +type = "ParseCbor" + +[[filters]] +type = "IntoJson" + +[sink] +type = "FileRotate" +max_total_files = 5 +output_format = "JSONL" +output_path = "./output/logs.jsonl" +max_bytes_per_file = 5_000_000 +compress_files = true diff --git a/src/filters/json.rs b/src/filters/into_json.rs similarity index 79% rename from src/filters/json.rs rename to src/filters/into_json.rs index 1a7aa0ab..d688af1a 100644 --- a/src/filters/json.rs +++ b/src/filters/into_json.rs @@ -2,11 +2,12 @@ use gasket::framework::*; use serde::Deserialize; +use serde_json::Value as JsonValue; use crate::framework::*; #[derive(Default, Stage)] -#[stage(name = "filter-json", unit = "ChainEvent", worker = "Worker")] +#[stage(name = "into-json", unit = "ChainEvent", worker = "Worker")] pub struct Stage { pub input: FilterInputPort, pub output: FilterOutputPort, @@ -25,7 +26,7 @@ impl From<&Stage> for Worker { } gasket::impl_mapper!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { - let out = unit.clone(); + let out = unit.clone().map_record(|r| Record::GenericJson(JsonValue::from(r))); stage.ops_count.inc(1); out }); diff --git a/src/filters/mod.rs b/src/filters/mod.rs index 839613c9..19b71dc7 100644 --- a/src/filters/mod.rs +++ b/src/filters/mod.rs @@ -3,7 +3,7 @@ use serde::Deserialize; use crate::framework::*; -pub mod json; +pub mod into_json; pub mod legacy_v1; pub mod noop; pub mod parse_cbor; @@ -19,7 +19,7 @@ pub mod deno; pub enum Bootstrapper { Noop(noop::Stage), SplitBlock(split_block::Stage), - Json(json::Stage), + IntoJson(into_json::Stage), LegacyV1(legacy_v1::Stage), ParseCbor(parse_cbor::Stage), Select(select::Stage), @@ -36,7 +36,7 @@ impl Bootstrapper { match self { Bootstrapper::Noop(p) => &mut p.input, Bootstrapper::SplitBlock(p) => &mut p.input, - Bootstrapper::Json(p) => &mut p.input, + Bootstrapper::IntoJson(p) => &mut p.input, Bootstrapper::LegacyV1(p) => &mut p.input, Bootstrapper::ParseCbor(p) => &mut p.input, Bootstrapper::Select(p) => &mut p.input, @@ -53,7 +53,7 @@ impl Bootstrapper { match self { Bootstrapper::Noop(p) => &mut p.output, Bootstrapper::SplitBlock(p) => &mut p.output, - Bootstrapper::Json(p) => &mut p.output, + Bootstrapper::IntoJson(p) => &mut p.output, Bootstrapper::LegacyV1(p) => &mut p.output, Bootstrapper::ParseCbor(p) => &mut p.output, Bootstrapper::Select(p) => &mut p.output, @@ -70,7 +70,7 @@ impl Bootstrapper { match self { Bootstrapper::Noop(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::SplitBlock(x) => gasket::runtime::spawn_stage(x, policy), - Bootstrapper::Json(x) => gasket::runtime::spawn_stage(x, policy), + Bootstrapper::IntoJson(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::LegacyV1(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::ParseCbor(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::Select(x) => gasket::runtime::spawn_stage(x, policy), @@ -89,7 +89,7 @@ impl Bootstrapper { pub enum Config { Noop(noop::Config), SplitBlock(split_block::Config), - Json(json::Config), + IntoJson(into_json::Config), LegacyV1(legacy_v1::Config), ParseCbor(parse_cbor::Config), Select(select::Config), @@ -106,7 +106,7 @@ impl Config { match self { Config::Noop(c) => Ok(Bootstrapper::Noop(c.bootstrapper(ctx)?)), Config::SplitBlock(c) => Ok(Bootstrapper::SplitBlock(c.bootstrapper(ctx)?)), - Config::Json(c) => Ok(Bootstrapper::Json(c.bootstrapper(ctx)?)), + Config::IntoJson(c) => Ok(Bootstrapper::IntoJson(c.bootstrapper(ctx)?)), Config::LegacyV1(c) => Ok(Bootstrapper::LegacyV1(c.bootstrapper(ctx)?)), Config::ParseCbor(c) => Ok(Bootstrapper::ParseCbor(c.bootstrapper(ctx)?)), Config::Select(c) => Ok(Bootstrapper::Select(c.bootstrapper(ctx)?)),