Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: migrate n2c source to new pipeline #598

Merged
merged 1 commit into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/bin/oura/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use gasket::runtime::Tether;
use oura::{filters, framework::*, sinks, sources};
use pallas::ledger::traverse::wellknown::GenesisValues;
use serde::Deserialize;
use std::time::Duration;
use std::{collections::VecDeque, time::Duration};

use crate::console;

Expand Down Expand Up @@ -141,12 +141,16 @@ pub fn run(args: &Args) -> Result<(), Error> {
let config = ConfigRoot::new(&args.config).map_err(Error::config)?;

let chain = config.chain.unwrap_or_default();
let cursor = Cursor::new(config.intersect.into());
let intersect = config.intersect;
let finalize = config.finalize;
let current_dir = std::env::current_dir().unwrap();

// TODO: load from persistence mechanism
let cursor = Cursor::new(VecDeque::new());

let ctx = Context {
chain,
intersect,
finalize,
cursor,
current_dir,
Expand Down
61 changes: 17 additions & 44 deletions src/framework/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,68 +5,41 @@ use std::{

use pallas::network::miniprotocols::Point;

#[derive(Clone)]
pub enum Intersection {
Tip,
Origin,
Breadcrumbs(VecDeque<Point>),
}

const HARDCODED_BREADCRUMBS: usize = 20;

type State = VecDeque<Point>;

// TODO: include exponential breadcrumbs logic here
#[derive(Clone)]
pub struct Cursor(Arc<RwLock<Intersection>>);
pub struct Cursor(Arc<RwLock<State>>);

impl Cursor {
pub fn new(value: Intersection) -> Self {
Self(Arc::new(RwLock::new(value)))
pub fn new(state: State) -> Self {
Self(Arc::new(RwLock::new(state)))
}

pub fn read(&self) -> Intersection {
pub fn is_empty(&self) -> bool {
let v = self.0.read().unwrap();
v.is_empty()
}

pub fn clone_state(&self) -> State {
let v = self.0.read().unwrap();
v.clone()
}

pub fn latest_known_point(&self) -> Option<Point> {
let guard = self.0.read().unwrap();

match &*guard {
Intersection::Breadcrumbs(v) => v.front().cloned(),
_ => None,
}
let state = self.0.read().unwrap();
state.front().cloned()
}

pub fn add_breadcrumb(&self, value: Point) {
let mut guard = self.0.write().unwrap();

match &mut *guard {
Intersection::Tip | Intersection::Origin => {
*guard = Intersection::Breadcrumbs(VecDeque::from(vec![value]));
}
Intersection::Breadcrumbs(crumbs) => {
crumbs.push_front(value);
let mut state = self.0.write().unwrap();

if crumbs.len() > HARDCODED_BREADCRUMBS {
crumbs.pop_back();
}
}
}
}
}
state.push_front(value);

impl From<Intersection> for pallas::upstream::Intersection {
fn from(value: Intersection) -> Self {
match value {
Intersection::Tip => Self::Tip,
Intersection::Origin => Self::Origin,
Intersection::Breadcrumbs(x) => Self::Breadcrumbs(Vec::from(x)),
if state.len() > HARDCODED_BREADCRUMBS {
state.pop_back();
}
}
}

impl pallas::upstream::Cursor for Cursor {
fn intersection(&self) -> pallas::upstream::Intersection {
self.read().into()
}
}
50 changes: 12 additions & 38 deletions src/framework/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! Internal pipeline framework

use pallas::ledger::traverse::wellknown::GenesisValues;
use pallas::network::miniprotocols::Point;
use serde::Deserialize;
use std::collections::VecDeque;
use std::fmt::Debug;
use std::path::PathBuf;

// we use UtxoRpc as our canonical representation of a parsed Tx
use utxorpc_spec_ledger::v1::Tx as ParsedTx;
pub use utxorpc_spec_ledger::v1::Tx as ParsedTx;

// we use GenesisValues from Pallas as our ChainConfig
pub use pallas::ledger::traverse::wellknown::GenesisValues;

pub mod cursor;
pub mod errors;
Expand Down Expand Up @@ -47,6 +48,7 @@ impl From<ChainConfig> for GenesisValues {

pub struct Context {
pub chain: GenesisValues,
pub intersect: IntersectConfig,
pub cursor: Cursor,
pub finalize: Option<FinalizeConfig>,
pub current_dir: PathBuf,
Expand Down Expand Up @@ -177,23 +179,13 @@ pub enum IntersectConfig {
Tip,
Origin,
Point(u64, String),
Fallbacks(Vec<(u64, String)>),
Breadcrumbs(Vec<(u64, String)>),
}

impl IntersectConfig {
pub fn get_point(&self) -> Option<Point> {
match self {
IntersectConfig::Point(slot, hash) => {
let hash = hex::decode(hash).expect("valid hex hash");
Some(Point::Specific(*slot, hash))
}
_ => None,
}
}

pub fn get_fallbacks(&self) -> Option<Vec<Point>> {
pub fn points(&self) -> Option<Vec<Point>> {
match self {
IntersectConfig::Fallbacks(all) => {
IntersectConfig::Breadcrumbs(all) => {
let mapped = all
.iter()
.map(|(slot, hash)| {
Expand All @@ -204,29 +196,11 @@ impl IntersectConfig {

Some(mapped)
}
_ => None,
}
}
}

impl From<IntersectConfig> for Intersection {
fn from(value: IntersectConfig) -> Self {
match value {
IntersectConfig::Tip => Intersection::Tip,
IntersectConfig::Origin => Intersection::Origin,
IntersectConfig::Point(x, y) => {
let point = Point::Specific(x, hex::decode(y).unwrap());

Intersection::Breadcrumbs(VecDeque::from(vec![point]))
}
IntersectConfig::Fallbacks(x) => {
let points: Vec<_> = x
.iter()
.map(|(x, y)| Point::Specific(*x, hex::decode(y).unwrap()))
.collect();

Intersection::Breadcrumbs(VecDeque::from(points))
IntersectConfig::Point(slot, hash) => {
let hash = hex::decode(hash).expect("valid hex hash");
Some(vec![Point::Specific(*slot, hash)])
}
_ => None,
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use gasket::runtime::Tether;
use gasket::{messaging::SendPort, runtime::Tether};
use serde::Deserialize;

use crate::framework::*;

//#[cfg(target_family = "unix")]
//pub mod n2c;

pub mod n2c;
pub mod n2n;

#[cfg(feature = "aws")]
pub mod s3;

pub enum Bootstrapper {
N2N(n2n::Stage),
N2C(),
N2C(n2c::Stage),

#[cfg(feature = "aws")]
S3(s3::Stage),
Expand All @@ -22,8 +23,8 @@ pub enum Bootstrapper {
impl StageBootstrapper for Bootstrapper {
fn connect_output(&mut self, adapter: OutputAdapter) {
match self {
Bootstrapper::N2N(p) => n2n::connect_output(p, adapter),
Bootstrapper::N2C() => todo!(),
Bootstrapper::N2N(p) => p.output.connect(adapter),
Bootstrapper::N2C(p) => p.output.connect(adapter),

#[cfg(feature = "aws")]
Bootstrapper::S3(p) => p.output.connect(adapter),
Expand All @@ -37,7 +38,7 @@ impl StageBootstrapper for Bootstrapper {
fn spawn(self, policy: gasket::runtime::Policy) -> Tether {
match self {
Bootstrapper::N2N(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::N2C() => todo!(),
Bootstrapper::N2C(x) => gasket::runtime::spawn_stage(x, policy),

#[cfg(feature = "aws")]
Bootstrapper::S3(x) => gasket::runtime::spawn_stage(x, policy),
Expand All @@ -51,7 +52,7 @@ pub enum Config {
N2N(n2n::Config),

#[cfg(target_family = "unix")]
N2C,
N2C(n2c::Config),

#[cfg(feature = "aws")]
S3(s3::Config),
Expand All @@ -61,7 +62,7 @@ impl Config {
pub fn bootstrapper(self, ctx: &Context) -> Result<Bootstrapper, Error> {
match self {
Config::N2N(c) => Ok(Bootstrapper::N2N(c.bootstrapper(ctx)?)),
Config::N2C => todo!(),
Config::N2C(c) => Ok(Bootstrapper::N2C(c.bootstrapper(ctx)?)),

#[cfg(feature = "aws")]
Config::S3(c) => Ok(Bootstrapper::S3(c.bootstrapper(ctx)?)),
Expand Down
Loading