Skip to content

Commit

Permalink
fix u5c deps
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Dec 16, 2024
1 parent 3687a60 commit 8384fed
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 143 deletions.
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ aws = ["aws-config", "aws-types", "aws-sdk-sqs", "aws-sdk-lambda", "aws-sdk-s3"]
sql = ["sqlx"]
gcp = ["google-cloud-pubsub", "google-cloud-googleapis", "google-cloud-default", "jsonwebtoken"]
rabbitmq = ["lapin"]
u5c = ["tonic"]
u5c = ["utxorpc", "futures"]
mithril = ["mithril-client"]
# elasticsearch = auto feature flag
# kafka = auto feature flag
Expand Down Expand Up @@ -64,7 +64,6 @@ google-cloud-pubsub = { version = "0.16.0", optional = true }
google-cloud-googleapis = { version = "0.10.0", optional = true }
google-cloud-default = { version = "0.4.0", optional = true, features = ["pubsub"] }
jsonwebtoken = { version = "8.3.0", optional = true }
tonic = { version = "0.12.3", features = ["tls", "tls-roots"], optional = true }
futures = { version = "0.3.28", optional = true }
sqlx = { version = "0.7", features = ["runtime-tokio", "tls-native-tls", "any", "sqlite", "postgres"], optional = true }
aws-config = { version = "^1.1", optional = true }
Expand All @@ -77,3 +76,4 @@ mithril-client = { version = "^0.8", optional = true, features = ["fs"] }
miette = { version = "7.2.0", features = ["fancy"] }
itertools = "0.12.1"
redis = { version = "0.27.6", optional = true }
utxorpc = { version = "0.9.0", optional = true }
209 changes: 69 additions & 140 deletions src/sources/u5c.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
use futures::StreamExt;
use gasket::framework::*;
use serde::Deserialize;
use tonic::transport::Channel;
use tonic::Streaming;
use tracing::{debug, error};

use pallas::interop::utxorpc::spec::sync::any_chain_block::Chain;
use pallas::interop::utxorpc::spec::sync::follow_tip_response::Action;
use pallas::interop::utxorpc::spec::sync::sync_service_client::SyncServiceClient;
use pallas::interop::utxorpc::spec::sync::{
BlockRef, DumpHistoryRequest, FollowTipRequest, FollowTipResponse,
};
use pallas::interop::utxorpc::spec::sync::BlockRef;
use pallas::network::miniprotocols::Point;
use serde::Deserialize;
use tracing::debug;
use utxorpc::{CardanoSyncClient, ClientBuilder, TipEvent};

use crate::framework::*;

Expand All @@ -26,142 +18,75 @@ fn point_to_blockref(point: Point) -> Option<BlockRef> {
}

pub struct Worker {
client: SyncServiceClient<Channel>,
stream: Option<Streaming<FollowTipResponse>>,
intersect: Option<BlockRef>,
max_items_per_page: u32,
stream: utxorpc::LiveTip<utxorpc::Cardano>,
}

impl Worker {
async fn process_next(&self, stage: &mut Stage, action: &Action) -> Result<(), WorkerError> {
match action {
Action::Apply(block) => {
if let Some(chain) = &block.chain {
match chain {
Chain::Cardano(block) => {
if block.body.is_some() {
let header = block.header.as_ref().unwrap();

let block = block.body.as_ref().unwrap();

for tx in block.tx.clone() {
let evt = ChainEvent::Apply(
Point::Specific(header.slot, header.hash.to_vec()),
Record::ParsedTx(tx),
);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(header.slot as i64);
}
}
}
async fn process_next(
&self,
stage: &mut Stage,
unit: &TipEvent<utxorpc::Cardano>,
) -> Result<(), WorkerError> {
match unit {
TipEvent::Apply(block) => {
if let Some(block) = &block.parsed {
let header = block.header.as_ref().unwrap();

let block = block.body.as_ref().unwrap();

for tx in block.tx.clone() {
let evt = ChainEvent::Apply(
Point::Specific(header.slot, header.hash.to_vec()),
Record::ParsedTx(tx),
);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(header.slot as i64);
}
}
}
Action::Undo(block) => {
if let Some(chain) = &block.chain {
match chain {
Chain::Cardano(block) => {
if block.body.is_some() {
let header = block.header.as_ref().unwrap();

let block = block.body.as_ref().unwrap();

for tx in block.tx.clone() {
let evt = ChainEvent::Undo(
Point::Specific(header.slot, header.hash.to_vec()),
Record::ParsedTx(tx),
);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(header.slot as i64);
}
}
}
TipEvent::Undo(block) => {
if let Some(block) = &block.parsed {
let header = block.header.as_ref().unwrap();

let block = block.body.as_ref().unwrap();

for tx in block.tx.clone() {
let evt = ChainEvent::Undo(
Point::Specific(header.slot, header.hash.to_vec()),
Record::ParsedTx(tx),
);

stage.output.send(evt.into()).await.or_panic()?;
stage.chain_tip.set(header.slot as i64);
}
}
}
Action::Reset(reset) => {
TipEvent::Reset(block) => {
stage
.output
.send(ChainEvent::Reset(Point::new(reset.index, reset.hash.to_vec())).into())
.send(ChainEvent::Reset(Point::new(block.index, block.hash.to_vec())).into())
.await
.or_panic()?;

stage.chain_tip.set(reset.index as i64);
stage.chain_tip.set(block.index as i64);
}
}

Ok(())
}

async fn next_stream(&mut self) -> Result<WorkSchedule<Vec<Action>>, WorkerError> {
if self.stream.is_none() {
let stream = self
.client
.follow_tip(FollowTipRequest::default())
.await
.or_restart()?
.into_inner();

self.stream = Some(stream);
}

let result = self.stream.as_mut().unwrap().next().await;

if result.is_none() {
return Ok(WorkSchedule::Idle);
}

let result = result.unwrap();
if let Err(err) = result {
error!("{err}");
return Err(WorkerError::Retry);
}

let response: FollowTipResponse = result.unwrap();
if response.action.is_none() {
return Ok(WorkSchedule::Idle);
}

let action = response.action.unwrap();

Ok(WorkSchedule::Unit(vec![action]))
}

async fn next_dump_history(&mut self) -> Result<WorkSchedule<Vec<Action>>, WorkerError> {
let dump_history_request = DumpHistoryRequest {
start_token: self.intersect.clone(),
max_items: self.max_items_per_page,
..Default::default()
};

let result = self
.client
.dump_history(dump_history_request)
.await
.or_restart()?
.into_inner();

self.intersect = result.next_token;

if !result.block.is_empty() {
let actions: Vec<Action> = result.block.into_iter().map(Action::Apply).collect();
return Ok(WorkSchedule::Unit(actions));
}

Ok(WorkSchedule::Idle)
}
}

#[async_trait::async_trait(?Send)]
impl gasket::framework::Worker<Stage> for Worker {
async fn bootstrap(stage: &Stage) -> Result<Self, WorkerError> {
debug!("connecting");

let client = SyncServiceClient::connect(stage.config.url.clone())
.await
.or_panic()?;
let mut client = ClientBuilder::new()
.uri(stage.config.url.as_str())
.or_panic()?
.build::<CardanoSyncClient>()
.await;

let intersect: Vec<_> = if stage.breadcrumbs.is_empty() {
stage.intersect.points().unwrap_or_default()
Expand All @@ -175,35 +100,40 @@ impl gasket::framework::Worker<Stage> for Worker {
.collect::<Vec<_>>()
.pop();

let max_items_per_page = stage.config.max_items_per_page.unwrap_or(20);
let stream = client
.follow_tip(intersect.into_iter().collect())
.await
.or_restart()?;

Ok(Self {
client,
stream: None,
max_items_per_page,
intersect,
})
Ok(Self { stream })
}

async fn schedule(&mut self, _: &mut Stage) -> Result<WorkSchedule<Vec<Action>>, WorkerError> {
if self.intersect.is_some() {
return self.next_dump_history().await;
}
async fn schedule(
&mut self,
_: &mut Stage,
) -> Result<WorkSchedule<TipEvent<utxorpc::Cardano>>, WorkerError> {
let event = self.stream.event().await.or_restart()?;

self.next_stream().await
Ok(WorkSchedule::Unit(event))
}

async fn execute(&mut self, unit: &Vec<Action>, stage: &mut Stage) -> Result<(), WorkerError> {
for action in unit {
self.process_next(stage, action).await.or_retry()?;
}
async fn execute(
&mut self,
unit: &TipEvent<utxorpc::Cardano>,
stage: &mut Stage,
) -> Result<(), WorkerError> {
self.process_next(stage, unit).await.or_retry()?;

Ok(())
}
}

#[derive(Stage)]
#[stage(name = "source-utxorpc", unit = "Vec<Action>", worker = "Worker")]
#[stage(
name = "source-utxorpc",
unit = "TipEvent<utxorpc::Cardano>",
worker = "Worker"
)]
pub struct Stage {
config: Config,
breadcrumbs: Breadcrumbs,
Expand All @@ -224,7 +154,6 @@ pub struct Stage {
#[derive(Deserialize)]
pub struct Config {
url: String,
max_items_per_page: Option<u32>,
}

impl Config {
Expand Down

0 comments on commit 8384fed

Please sign in to comment.