Skip to content

Commit

Permalink
Trying to restore repeat_fn.
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitkulshreshtha committed Dec 16, 2024
1 parent 2d72685 commit 87ee9da
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 26 deletions.
1 change: 0 additions & 1 deletion hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ debugging = [ "hydroflow_lang/debugging" ]

[[example]]
name = "kvs_bench"
required-features = [ "nightly" ]

[[example]]
name = "python_udf"
Expand Down
43 changes: 18 additions & 25 deletions hydroflow/examples/kvs_bench/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,31 +140,24 @@ pub fn run_server<RX>(

let mut df = hydroflow_syntax! {

simulated_put_requests = spin() -> flat_map(|_| {
let buffer_pool = buffer_pool.clone();
let pre_gen_random_numbers = &pre_gen_random_numbers;
std::iter::repeat_with(move || {
let value = BufferPool::get_from_buffer_pool(&buffer_pool);

// Did the original C++ benchmark do anything with the data..?
// Can uncomment this to modify the buffers then.
//
// let mut borrow = buff.borrow_mut().unwrap();

// let mut r = rng.sample(dist_uniform) as u64;
// for i in 0..8 {
// borrow[i] = (r % 256) as u8;
// r /= 256;
// }

let key = pre_gen_random_numbers[pre_gen_index % pre_gen_random_numbers.len()];
pre_gen_index += 1;

(KvsRequest::Put {
key,
value,
}, 99999999)
})

simulated_put_requests = repeat_fn(2000, move || {
let value = BufferPool::get_from_buffer_pool(&buffer_pool);
// Did the original C++ benchmark do anything with the data..?
// Can uncomment this to modify the buffers then.
//
// let mut borrow = buff.borrow_mut().unwrap();
// let mut r = rng.sample(dist_uniform) as u64;
// for i in 0..8 {
// borrow[i] = (r % 256) as u8;
// r /= 256;
// }
let key = pre_gen_random_numbers[pre_gen_index % pre_gen_random_numbers.len()];
pre_gen_index += 1;
(KvsRequest::Put {
key,
value,
}, 99999999)
});

union_puts_and_gossip_requests = union();
Expand Down
2 changes: 2 additions & 0 deletions hydroflow_lang/src/graph/ops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Hydroflow's operators

use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::ops::{Bound, RangeBounds};
Expand Down Expand Up @@ -296,6 +297,7 @@ declare_ops![
py_udf::PY_UDF,
reduce::REDUCE,
spin::SPIN,
repeat_fn::REPEAT_FN,
sort::SORT,
sort_by_key::SORT_BY_KEY,
source_file::SOURCE_FILE,
Expand Down
82 changes: 82 additions & 0 deletions hydroflow_lang/src/graph/ops/repeat_fn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use super::{
OperatorConstraints, OperatorWriteOutput, WriteContextArgs,
RANGE_0, RANGE_1,
};
use crate::{
diagnostic::{Diagnostic, Level},
graph::OperatorInstance,
};
use quote::quote_spanned;
use syn::Expr;
/// > 0 input streams, 1 output stream
///
/// > Arguments: A batch size per tick, and a zero argument closure to produce each item in the stream.
/// Similar to `repeat_iter`, but generates the items by calling the supplied closure instead of cloning them from an input iter
///
/// ```hydroflow
/// repeat_fn(10, || 7) -> for_each(|x| println!("{}", x));
/// ```
pub const REPEAT_FN: OperatorConstraints = OperatorConstraints {
name: "repeat_fn",
categories: &[],
hard_range_inn: RANGE_0,
soft_range_inn: RANGE_0,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 2,
persistence_args: RANGE_0,
type_args: RANGE_0,
is_external_input: false,
has_singleton_output: false,
flo_type: None,
ports_inn: None,
ports_out: None,
input_delaytype_fn: |_| None,
write_fn: |wc @ &WriteContextArgs {
context,
op_span,
ident,
arguments,
..
},
diagnostics| {
let func = &arguments[1];
let Expr::Closure(func) = func else {
diagnostics.push(Diagnostic::spanned(
op_span,
Level::Error,
"Second argument must be a 0 argument closure"),
);
return Err(());
};
if !func.inputs.is_empty() {
diagnostics.push(Diagnostic::spanned(
op_span,
Level::Error,
"The function supplied must take zero arguments",
));
return Err(());
}
let gen_ident = wc.make_ident("gen_fun");
let write_prologue = quote_spanned! {op_span=>
#[allow(unused_mut)] // Sometimes the closure provided is an FnMut in which case it does need mut.
let mut #gen_ident = #func;
};
let batch_size = &arguments[0];
let write_iterator = quote_spanned! {op_span=>
let #ident = {
(0..#batch_size).map(|_| #gen_ident())
};
};
let write_iterator_after = quote_spanned! {op_span=>
#context.schedule_subgraph(#context.current_subgraph(), true);
};
Ok(OperatorWriteOutput {
write_prologue,
write_iterator,
write_iterator_after,
..Default::default()
})
},
};

0 comments on commit 87ee9da

Please sign in to comment.