Skip to content

Commit

Permalink
chore: migrate n2c source to new pipeline (txpipe#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored May 17, 2023
1 parent 7edad56 commit a35856a
Show file tree
Hide file tree
Showing 7 changed files with 460 additions and 120 deletions.
8 changes: 6 additions & 2 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use gasket::runtime::Tether;
use oura::{filters, framework::*, sinks, sources};
use pallas::ledger::traverse::wellknown::GenesisValues;
use serde::Deserialize;
use std::time::Duration;
use std::{collections::VecDeque, time::Duration};

use crate::console;

Expand Down Expand Up @@ -141,12 +141,16 @@ pub fn run(args: &Args) -> Result<(), Error> {
let config = ConfigRoot::new(&args.config).map_err(Error::config)?;

let chain = config.chain.unwrap_or_default();
let cursor = Cursor::new(config.intersect.into());
let intersect = config.intersect;
let finalize = config.finalize;
let current_dir = std::env::current_dir().unwrap();

// TODO: load from persistence mechanism
let cursor = Cursor::new(VecDeque::new());

let ctx = Context {
chain,
intersect,
finalize,
cursor,
current_dir,
Expand Down
61 changes: 17 additions & 44 deletions src/framework/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,41 @@ use std::{

use pallas::network::miniprotocols::Point;

#[derive(Clone)]
pub enum Intersection {
Tip,
Origin,
Breadcrumbs(VecDeque<Point>),
}

const HARDCODED_BREADCRUMBS: usize = 20;

type State = VecDeque<Point>;

// TODO: include exponential breadcrumbs logic here
#[derive(Clone)]
pub struct Cursor(Arc<RwLock<Intersection>>);
pub struct Cursor(Arc<RwLock<State>>);

impl Cursor {
pub fn new(value: Intersection) -> Self {
Self(Arc::new(RwLock::new(value)))
pub fn new(state: State) -> Self {
Self(Arc::new(RwLock::new(state)))
}

pub fn read(&self) -> Intersection {
pub fn is_empty(&self) -> bool {
let v = self.0.read().unwrap();
v.is_empty()
}

pub fn clone_state(&self) -> State {
let v = self.0.read().unwrap();
v.clone()
}

pub fn latest_known_point(&self) -> Option<Point> {
let guard = self.0.read().unwrap();

match &*guard {
Intersection::Breadcrumbs(v) => v.front().cloned(),
_ => None,
}
let state = self.0.read().unwrap();
state.front().cloned()
}

pub fn add_breadcrumb(&self, value: Point) {
let mut guard = self.0.write().unwrap();

match &mut *guard {
Intersection::Tip | Intersection::Origin => {
*guard = Intersection::Breadcrumbs(VecDeque::from(vec![value]));
}
Intersection::Breadcrumbs(crumbs) => {
crumbs.push_front(value);
let mut state = self.0.write().unwrap();

if crumbs.len() > HARDCODED_BREADCRUMBS {
crumbs.pop_back();
}
}
}
}
}
state.push_front(value);

impl From<Intersection> for pallas::upstream::Intersection {
fn from(value: Intersection) -> Self {
match value {
Intersection::Tip => Self::Tip,
Intersection::Origin => Self::Origin,
Intersection::Breadcrumbs(x) => Self::Breadcrumbs(Vec::from(x)),
if state.len() > HARDCODED_BREADCRUMBS {
state.pop_back();
}
}
}

impl pallas::upstream::Cursor for Cursor {
fn intersection(&self) -> pallas::upstream::Intersection {
self.read().into()
}
}
50 changes: 12 additions & 38 deletions src/framework/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! Internal pipeline framework
use pallas::ledger::traverse::wellknown::GenesisValues;
use pallas::network::miniprotocols::Point;
use serde::Deserialize;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::path::PathBuf;

// we use UtxoRpc as our canonical representation of a parsed Tx
use utxorpc_spec_ledger::v1::Tx as ParsedTx;
pub use utxorpc_spec_ledger::v1::Tx as ParsedTx;

// we use GenesisValues from Pallas as our ChainConfig
pub use pallas::ledger::traverse::wellknown::GenesisValues;

pub mod cursor;
pub mod errors;
Expand Down Expand Up @@ -47,6 +48,7 @@ impl From<ChainConfig> for GenesisValues {

pub struct Context {
pub chain: GenesisValues,
pub intersect: IntersectConfig,
pub cursor: Cursor,
pub finalize: Option<FinalizeConfig>,
pub current_dir: PathBuf,
Expand Down Expand Up @@ -177,23 +179,13 @@ pub enum IntersectConfig {
Tip,
Origin,
Point(u64, String),
Fallbacks(Vec<(u64, String)>),
Breadcrumbs(Vec<(u64, String)>),
}

impl IntersectConfig {
pub fn get_point(&self) -> Option<Point> {
match self {
IntersectConfig::Point(slot, hash) => {
let hash = hex::decode(hash).expect("valid hex hash");
Some(Point::Specific(*slot, hash))
}
_ => None,
}
}

pub fn get_fallbacks(&self) -> Option<Vec<Point>> {
pub fn points(&self) -> Option<Vec<Point>> {
match self {
IntersectConfig::Fallbacks(all) => {
IntersectConfig::Breadcrumbs(all) => {
let mapped = all
.iter()
.map(|(slot, hash)| {
Expand All @@ -204,29 +196,11 @@ impl IntersectConfig {

Some(mapped)
}
_ => None,
}
}
}

impl From<IntersectConfig> for Intersection {
fn from(value: IntersectConfig) -> Self {
match value {
IntersectConfig::Tip => Intersection::Tip,
IntersectConfig::Origin => Intersection::Origin,
IntersectConfig::Point(x, y) => {
let point = Point::Specific(x, hex::decode(y).unwrap());

Intersection::Breadcrumbs(VecDeque::from(vec![point]))
}
IntersectConfig::Fallbacks(x) => {
let points: Vec<_> = x
.iter()
.map(|(x, y)| Point::Specific(*x, hex::decode(y).unwrap()))
.collect();

Intersection::Breadcrumbs(VecDeque::from(points))
IntersectConfig::Point(slot, hash) => {
let hash = hex::decode(hash).expect("valid hex hash");
Some(vec![Point::Specific(*slot, hash)])
}
_ => None,
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use gasket::runtime::Tether;
use gasket::{messaging::SendPort, runtime::Tether};
use serde::Deserialize;

use crate::framework::*;

//#[cfg(target_family = "unix")]
//pub mod n2c;

pub mod n2c;
pub mod n2n;

#[cfg(feature = "aws")]
pub mod s3;

pub enum Bootstrapper {
N2N(n2n::Stage),
N2C(),
N2C(n2c::Stage),

#[cfg(feature = "aws")]
S3(s3::Stage),
Expand All @@ -22,8 +23,8 @@ pub enum Bootstrapper {
impl StageBootstrapper for Bootstrapper {
fn connect_output(&mut self, adapter: OutputAdapter) {
match self {
Bootstrapper::N2N(p) => n2n::connect_output(p, adapter),
Bootstrapper::N2C() => todo!(),
Bootstrapper::N2N(p) => p.output.connect(adapter),
Bootstrapper::N2C(p) => p.output.connect(adapter),

#[cfg(feature = "aws")]
Bootstrapper::S3(p) => p.output.connect(adapter),
Expand All @@ -37,7 +38,7 @@ impl StageBootstrapper for Bootstrapper {
fn spawn(self, policy: gasket::runtime::Policy) -> Tether {
match self {
Bootstrapper::N2N(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::N2C() => todo!(),
Bootstrapper::N2C(x) => gasket::runtime::spawn_stage(x, policy),

#[cfg(feature = "aws")]
Bootstrapper::S3(x) => gasket::runtime::spawn_stage(x, policy),
Expand All @@ -51,7 +52,7 @@ pub enum Config {
N2N(n2n::Config),

#[cfg(target_family = "unix")]
N2C,
N2C(n2c::Config),

#[cfg(feature = "aws")]
S3(s3::Config),
Expand All @@ -61,7 +62,7 @@ impl Config {
pub fn bootstrapper(self, ctx: &Context) -> Result<Bootstrapper, Error> {
match self {
Config::N2N(c) => Ok(Bootstrapper::N2N(c.bootstrapper(ctx)?)),
Config::N2C => todo!(),
Config::N2C(c) => Ok(Bootstrapper::N2C(c.bootstrapper(ctx)?)),

#[cfg(feature = "aws")]
Config::S3(c) => Ok(Bootstrapper::S3(c.bootstrapper(ctx)?)),
Expand Down
Loading

0 comments on commit a35856a

Please sign in to comment.