Skip to content

Commit

Permalink
Merge branch 'main' into chore/update-pallas-git-edge
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Dec 15, 2024
2 parents 21210ce + 7453ae0 commit 3687a60
Show file tree
Hide file tree
Showing 9 changed files with 2,105 additions and 1,646 deletions.
3,662 changes: 2,055 additions & 1,607 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ aws = ["aws-config", "aws-types", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3"]
sql = ["sqlx"]
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "google-cloud-default", "jsonwebtoken"]
rabbitmq = ["lapin"]
redis = ["r2d2_redis"]
u5c = ["tonic"]
mithril = ["mithril-client"]
# elasticsearch = auto feature flag
Expand Down Expand Up @@ -56,7 +55,6 @@ file-rotate = { version = "0.7.5" }
reqwest = { version = "0.11", features = ["json", "multipart"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread"] }
async-trait = "0.1.68"

elasticsearch = { version = "8.5.0-alpha.1", optional = true }
murmur3 = { version = "0.5.2", optional = true }
openssl = { version = "0.10", optional = true, features = ["vendored"] }
Expand All @@ -65,7 +63,6 @@ kafka = { version = "0.10.0", optional = true }
google-cloud-pubsub = { version = "0.16.0", optional = true }
google-cloud-googleapis = { version = "0.10.0", optional = true }
google-cloud-default = { version = "0.4.0", optional = true, features = ["pubsub"] }
r2d2_redis = { version = "0.14.0", optional = true }
jsonwebtoken = { version = "8.3.0", optional = true }
tonic = { version = "0.12.3", features = ["tls", "tls-roots"], optional = true }
futures = { version = "0.3.28", optional = true }
Expand All @@ -79,3 +76,4 @@ extism = { version = "1.2.0", optional = true }
mithril-client = { version = "^0.8", optional = true, features = ["fs"] }
miette = { version = "7.2.0", features = ["fancy"] }
itertools = "0.12.1"
redis = { version = "0.27.6", optional = true }
24 changes: 21 additions & 3 deletions docs/pages/v2/filters/parse_cbor.mdx
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Parse CBOR filter

The `parse_cbor` filter aims to map cbor transactions to a structured transaction.
The `parse_cbor` filter aims to map cbor blocks to structured blocks and cbor transactions to structured transactions.

However, the filter will only work when the record received in the stage is CborTx in other words a transaction in Cbor format that was previously extracted from a block by another stage, otherwise, parse_cbor will ignore and pass the record to the next stage. When the record is CborTx, parse_cbor will decode and map the Cbor to a structure, so the next stage will receive the ParsedTx record. If no filter is enabled, the stages will receive the record in CborBlock format, and if only the parse_cbor filter is enabled in `daemon.toml`, it will be necessary to enable the [split_cbor](split_block) filter for the stage to receive the CborTx format.
- When the record received in the stage is CborBlock, the filter will decode and map it to ParsedBlock, which is passed to the next stage.
- When the record received in the stage is CborTx, the filter will decode and map it to ParsedTx, which is passed to the next stage. This will only happen when the record received in the stage is CborTx and therefore requires to enable the [split_cbor](split_block) filter before for the stage to receive the CborTx format.
- Else, parse_cbor will ignore and pass the record to the next stage.

## Configuration

Expand All @@ -15,7 +17,23 @@ type = "ParseCbor"

## Examples

Below is an example of the data that will be sent to the sink. A block can contain many transactions, so the sink will receive an event for each transaction in json format.
Below is an example of the data that will be sent to the sink when the filter received a CborBlock record.

```json
{
"event": "apply",
"point": {
"slot": 0,
"hash": ""
},
"record": {
"header": {},
"body": {}
}
}
```

Below is an example of the data that will be sent to the sink when the filter received a CborTx record. A block can contain many transactions, so the sink will receive an event for each transaction.

```json
{
Expand Down
19 changes: 8 additions & 11 deletions src/cursor/redis.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use gasket::framework::*;
use pallas::network::miniprotocols::Point;
use r2d2_redis::{
r2d2::{self, Pool},
redis::{self, Commands},
RedisConnectionManager,
};
use redis::Commands;
use serde::Deserialize;
use tokio::select;
use tracing::debug;
Expand Down Expand Up @@ -40,18 +36,17 @@ pub enum Unit {
}

pub struct Worker {
pool: Pool<RedisConnectionManager>,
client: redis::Client,
key: String,
}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
let manager = RedisConnectionManager::new(stage.url.clone()).or_panic()?;
let pool = r2d2::Pool::builder().build(manager).or_panic()?;
let client = redis::Client::open(stage.url.as_str()).or_retry()?;

Ok(Self {
pool,
client,
key: stage.key.clone(),
})
}
Expand All @@ -74,10 +69,12 @@ impl gasket::framework::Worker<Stage> for Worker {
Unit::Track(x) => stage.breadcrumbs.track(x.clone()),
Unit::Flush => {
let data = breadcrumbs_to_data(&stage.breadcrumbs);
let mut conn = self.pool.get().or_restart()?;
let mut conn = self.client.get_connection().or_restart()?;

let data_to_write = serde_json::to_string(&data).or_panic()?;
conn.set(&self.key, &data_to_write)

let _: () = conn
.set(&self.key, &data_to_write)
.map_err(Error::custom)
.or_panic()?;
}
Expand Down
5 changes: 5 additions & 0 deletions src/filters/parse_cbor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ impl From<&Stage> for Worker {

gasket::impl_mapper!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let output = unit.clone().try_map_record(|r| match r {
Record::CborBlock(cbor) => {
let block = trv::MultiEraBlock::decode(&cbor).or_panic()?;
let block = stage.mapper.map_block(&block);
Ok(Record::ParsedBlock(block))
}
Record::CborTx(cbor) => {
let tx = trv::MultiEraTx::decode(&cbor).or_panic()?;
let tx = stage.mapper.map_tx(&tx);
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use serde::Deserialize;

use crate::framework::*;

mod assert;
mod common;
mod file_rotate;
mod noop;
mod stdout;
mod terminal;
mod webhook;
pub mod assert;
pub mod common;
pub mod file_rotate;
pub mod noop;
pub mod stdout;
pub mod terminal;
pub mod webhook;

#[cfg(feature = "rabbitmq")]
mod rabbitmq;
Expand Down
19 changes: 6 additions & 13 deletions src/sinks/redis.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,18 @@
use std::ops::DerefMut;

use gasket::framework::*;
use r2d2_redis::{
r2d2::{self, Pool},
redis, RedisConnectionManager,
};
use serde::Deserialize;

use crate::framework::*;

pub struct Worker {
pool: Pool<RedisConnectionManager>,
client: redis::Client,
stream: String,
maxlen: Option<usize>,
}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
let manager = RedisConnectionManager::new(stage.config.url.clone()).or_panic()?;
let pool = r2d2::Pool::builder().build(manager).or_panic()?;
let client = redis::Client::open(stage.config.url.as_str()).or_retry()?;

let stream = stage
.config
Expand All @@ -30,7 +23,7 @@ impl gasket::framework::Worker<Stage> for Worker {
let maxlen = stage.config.stream_max_length;

Ok(Self {
pool,
client,
stream,
maxlen,
})
Expand All @@ -54,7 +47,7 @@ impl gasket::framework::Worker<Stage> for Worker {

let payload = serde_json::Value::from(record.unwrap()).to_string();

let mut conn = self.pool.get().or_restart()?;
let mut conn = self.client.get_connection().or_restart()?;

let mut command = redis::cmd("XADD");
command.arg(self.stream.clone());
Expand All @@ -64,10 +57,10 @@ impl gasket::framework::Worker<Stage> for Worker {
command.arg(maxlen);
}

command
let _: () = command
.arg("*")
.arg(&[point.slot_or_default().to_string(), payload])
.query(conn.deref_mut())
.query(&mut conn)
.or_retry()?;

stage.ops_count.inc(1);
Expand Down
2 changes: 1 addition & 1 deletion src/sources/n2c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl gasket::framework::Worker<Stage> for Worker {

#[derive(Deserialize)]
pub struct Config {
socket_path: PathBuf,
pub socket_path: PathBuf,
}

impl Config {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/n2n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl gasket::framework::Worker<Stage> for Worker {

#[derive(Deserialize)]
pub struct Config {
peers: Vec<String>,
pub peers: Vec<String>,
}

impl Config {
Expand Down

0 comments on commit 3687a60

Please sign in to comment.