Skip to content

Commit

Permalink
feat: Allow skipping storage (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Jun 28, 2022
1 parent 65cc3ae commit dc501c7
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/bin/scrolls/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use clap::{self, Parser};
use clap;
use scrolls::{bootstrap, crosscut, enrich, reducers, sources, storage};
use serde::Deserialize;
use std::time::Duration;
Expand Down
7 changes: 7 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod redis;
pub mod skip;

use gasket::messaging::InputPort;
use serde::Deserialize;
Expand All @@ -9,6 +10,7 @@ use crate::{bootstrap, crosscut, model};
#[serde(tag = "type")]
pub enum Config {
Redis(redis::Config),
Skip(skip::Config),
}

impl Config {
Expand All @@ -19,30 +21,35 @@ impl Config {
) -> Bootstrapper {
match self {
Config::Redis(c) => Bootstrapper::Redis(c.boostrapper(chain, intersect)),
Config::Skip(c) => Bootstrapper::Skip(c.boostrapper()),
}
}
}

pub enum Bootstrapper {
Redis(redis::Bootstrapper),
Skip(skip::Bootstrapper),
}

impl Bootstrapper {
pub fn borrow_input_port(&mut self) -> &'_ mut InputPort<model::CRDTCommand> {
match self {
Bootstrapper::Redis(x) => x.borrow_input_port(),
Bootstrapper::Skip(x) => x.borrow_input_port(),
}
}

pub fn read_cursor(&mut self) -> Result<crosscut::Cursor, crate::Error> {
match self {
Bootstrapper::Redis(x) => x.read_cursor(),
Bootstrapper::Skip(x) => x.read_cursor(),
}
}

pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) {
match self {
Bootstrapper::Redis(x) => x.spawn_stages(pipeline),
Bootstrapper::Skip(x) => x.spawn_stages(pipeline),
}
}
}
106 changes: 106 additions & 0 deletions src/storage/skip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::time::Duration;

use gasket::runtime::{spawn_stage, WorkOutcome};

use serde::Deserialize;

use crate::{bootstrap, crosscut, model};

type InputPort = gasket::messaging::InputPort<model::CRDTCommand>;

#[derive(Deserialize, Clone)]
pub struct Config {}

impl Config {
pub fn boostrapper(self) -> Bootstrapper {
Bootstrapper {
input: Default::default(),
}
}
}

pub struct Bootstrapper {
input: InputPort,
}

impl Bootstrapper {
pub fn borrow_input_port(&mut self) -> &'_ mut InputPort {
&mut self.input
}

pub fn read_cursor(&mut self) -> Result<crosscut::Cursor, crate::Error> {
Ok(None)
}

pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) {
let worker = Worker {
input: self.input,
ops_count: Default::default(),
};

pipeline.register_stage(
"skip",
spawn_stage(
worker,
gasket::runtime::Policy {
tick_timeout: Some(Duration::from_secs(5)),
..Default::default()
},
),
);
}
}

pub struct Worker {
ops_count: gasket::metrics::Counter,
input: InputPort,
}

impl gasket::runtime::Worker for Worker {
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Builder::new()
.with_counter("storage_ops", &self.ops_count)
.build()
}

fn work(&mut self) -> gasket::runtime::WorkResult {
let msg = self.input.recv_or_idle()?;

match msg.payload {
model::CRDTCommand::BlockStarting(point) => {
log::debug!("block started {:?}", point);
}
model::CRDTCommand::GrowOnlySetAdd(key, value) => {
log::debug!("adding to grow-only set [{}], value [{}]", key, value);
}
model::CRDTCommand::TwoPhaseSetAdd(key, value) => {
log::debug!("adding to 2-phase set [{}], value [{}]", key, value);
}
model::CRDTCommand::TwoPhaseSetRemove(key, value) => {
log::debug!("removing from 2-phase set [{}], value [{}]", key, value);
}
model::CRDTCommand::SetAdd(key, value) => {
log::debug!("adding to set [{}], value [{}]", key, value);
}
model::CRDTCommand::SetRemove(key, value) => {
log::debug!("removing from set [{}], value [{}]", key, value);
}
model::CRDTCommand::LastWriteWins(key, value, ts) => {
log::debug!("last write for [{}], value [{}], slot [{}]", key, value, ts);
}
model::CRDTCommand::AnyWriteWins(key, value) => {
log::debug!("overwrite [{}], value [{}]", key, value);
}
model::CRDTCommand::PNCounter(key, value) => {
log::debug!("increasing counter [{}], by [{}]", key, value);
}
model::CRDTCommand::BlockFinished(point) => {
log::debug!("block finished {:?}", point);
}
};

self.ops_count.inc(1);

Ok(WorkOutcome::Partial)
}
}

0 comments on commit dc501c7

Please sign in to comment.