Skip to content

Commit

Permalink
feat: use block data for u5c
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Dec 16, 2024
1 parent 887e49a commit 8d59498
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions src/sources/u5c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use pallas::interop::utxorpc::spec::sync::BlockRef;
use pallas::network::miniprotocols::Point;
use serde::Deserialize;
use tracing::debug;
use utxorpc::{CardanoSyncClient, ClientBuilder, TipEvent};
use utxorpc::{CardanoSyncClient, ChainBlock, ClientBuilder, TipEvent};

use crate::framework::*;

Expand All @@ -22,45 +22,49 @@ pub struct Worker {
}

impl Worker {
fn block_to_record(
&self,
stage: &Stage,
block: &ChainBlock<utxorpc::spec::cardano::Block>,
) -> Result<(Point, Record), WorkerError> {
let parsed = block.parsed.as_ref().ok_or(WorkerError::Panic)?;

let record = if stage.config.use_parsed_blocks {
Record::ParsedBlock(parsed.clone())
} else {
Record::CborBlock(block.native.to_vec())
};

let point = parsed
.header
.as_ref()
.map(|h| Point::Specific(h.slot, h.hash.to_vec()))
.ok_or(WorkerError::Panic)?;

Ok((point, record))
}

async fn process_next(
&self,
stage: &mut Stage,
unit: &TipEvent<utxorpc::Cardano>,
) -> Result<(), WorkerError> {
match unit {
TipEvent::Apply(block) => {
if let Some(block) = &block.parsed {
let header = block.header.as_ref().unwrap();
let (point, record) = self.block_to_record(stage, block)?;

let block = block.body.as_ref().unwrap();
let evt = ChainEvent::Apply(point.clone(), record);

for tx in block.tx.clone() {
let evt = ChainEvent::Apply(
Point::Specific(header.slot, header.hash.to_vec()),
Record::ParsedTx(tx),
);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(header.slot as i64);
}
}
stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(point.slot_or_default() as i64);
}
TipEvent::Undo(block) => {
if let Some(block) = &block.parsed {
let header = block.header.as_ref().unwrap();

let block = block.body.as_ref().unwrap();
let (point, record) = self.block_to_record(stage, block)?;

for tx in block.tx.clone() {
let evt = ChainEvent::Undo(
Point::Specific(header.slot, header.hash.to_vec()),
Record::ParsedTx(tx),
);
let evt = ChainEvent::Undo(point.clone(), record);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(header.slot as i64);
}
}
stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(point.slot_or_default() as i64);
}
TipEvent::Reset(block) => {
stage
Expand Down Expand Up @@ -154,6 +158,7 @@ pub struct Stage {
#[derive(Deserialize)]
pub struct Config {
url: String,
use_parsed_blocks: bool,
}

impl Config {
Expand Down

0 comments on commit 8d59498

Please sign in to comment.