Skip to content

Commit

Permalink
Switching to int keys and dom pair vals.
Browse files Browse the repository at this point in the history
  • Loading branch information
rohitkulshreshtha committed Dec 16, 2024
1 parent b6c54ff commit 2d72685
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 89 deletions.
2 changes: 1 addition & 1 deletion datastores/gossip_kv/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ enum InteractiveCommands {
/// Upsert a value in the store.
Set {
#[arg(value_parser = parse_key, required = true, help = "Key to set")]
key: Key,
key: u64,
value: String,
},
/// Delete a value from the store.
Expand Down
2 changes: 1 addition & 1 deletion datastores/gossip_kv/kv/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub enum ClientRequest {
/// A request to get the value of a key.
Get { key: Key },
/// A request to set the value of a key.
Set { key: Key, value: String },
Set { key: u64, value: String },
/// A request to delete the value of a key.
Delete { key: Key },
}
Expand Down
50 changes: 23 additions & 27 deletions datastores/gossip_kv/kv/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub type TableMap<V> = MapUnionHashMap<TableName, Table<V>>;

pub type NamespaceMap<V> = MapUnionHashMap<Namespace, TableMap<V>>;

pub type Namespaces<C> = NamespaceMap<RowValue<C>>;
pub type Namespaces<C> = MapUnionHashMap<u64, RowValue<C>>;

/// Timestamps used in the model.
// TODO: This will be updated to use a more sophisticated clock type with https://github.com/hydro-project/hydroflow/issues/1207.
Expand All @@ -42,36 +42,32 @@ pub type Clock = Max<u64>;
/// - `val`: Row value.
pub fn upsert_row<C>(
row_ts: C,
ns: Namespace,
table_name: TableName,
key: RowKey,
key: u64,
val: String,
) -> Namespaces<C> {
let value: RowValue<C> = RowValue::new_from(row_ts, SetUnionHashSet::new_from([val]));
let row: Table<RowValue<C>> = Table::new_from([(key, value)]);
let table: TableMap<RowValue<C>> = TableMap::new_from([(table_name, row)]);
Namespaces::new_from([(ns, table)])
}

/// TableMap element to delete a row from an existing TableMap.
///
/// Merge this into an existing TableMap to delete a row from a table.
///
/// Parameters:
/// - `row_ts`: New timestamp of the row being deleted.
/// - `table_name`: Name of the table.
/// - `key`: Primary key of the row.
pub fn delete_row<C>(
row_ts: C,
ns: Namespace,
table_name: TableName,
key: RowKey,
) -> Namespaces<C> {
let value: RowValue<C> = RowValue::new_from(row_ts, SetUnionHashSet::new_from([]));
let row: Table<RowValue<C>> = Table::new_from([(key, value)]);
let table = TableMap::new_from([(table_name, row)]);
Namespaces::new_from([(ns, table)])
Namespaces::new_from([(key, value)])
}
//
// /// TableMap element to delete a row from an existing TableMap.
// ///
// /// Merge this into an existing TableMap to delete a row from a table.
// ///
// /// Parameters:
// /// - `row_ts`: New timestamp of the row being deleted.
// /// - `table_name`: Name of the table.
// /// - `key`: Primary key of the row.
// pub fn delete_row<C>(
// row_ts: C,
// ns: Namespace,
// table_name: TableName,
// key: RowKey,
// ) -> Namespaces<C> {
// let value: RowValue<C> = RowValue::new_from(row_ts, SetUnionHashSet::new_from([]));
// let row: Table<RowValue<C>> = Table::new_from([(key, value)]);
// let table = TableMap::new_from([(table_name, row)]);
// Namespaces::new_from([(ns, table)])
// }

#[cfg(test)]
mod tests {
Expand Down
116 changes: 59 additions & 57 deletions datastores/gossip_kv/kv/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tracing::{info, trace};
use crate::lattices::BoundedSetLattice;
use crate::membership::{MemberData, MemberId};
use crate::model::{
delete_row, upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName,
upsert_row, Clock, NamespaceMap, Namespaces, RowKey, RowValue, TableMap, TableName,
};
use crate::util::{ClientRequestWithAddress, GossipRequestWithAddress};
use crate::GossipMessage::{Ack, Nack};
Expand Down Expand Up @@ -114,43 +114,45 @@ where
});

// Setup member metadata for this process.
on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), my_member_id.clone(), serde_json::to_string(&member_info).unwrap()))
-> writes;
// on_start -> map(|_| upsert_row(Clock::new(0), Namespace::System, "members".to_string(), my_member_id.clone(), serde_json::to_string(&member_info).unwrap()))
// -> writes;

client_out =
inspect(|(resp, addr)| trace!("{:?}: Sending response: {:?} to {:?}.", context.current_tick(), resp, addr))
-> dest_sink(client_outputs);
// client_out =
// inspect(|(resp, addr)| trace!("{:?}: Sending response: {:?} to {:?}.", context.current_tick(), resp, addr))
// -> dest_sink(client_outputs);

client_in = source_stream(client_inputs)
-> map(|(msg, addr)| ClientRequestWithAddress::from_request_and_address(msg, addr))
-> demux_enum::<ClientRequestWithAddress<Addr>>();

client_in[Get]
-> inspect(|req| trace!("{:?}: Received Get request: {:?}.", context.current_tick(), req))
-> map(|(key, addr) : (Key, Addr)| {
let row = MapUnionHashMap::new_from([
(
key.row_key,
SetUnionHashSet::new_from([addr /* to respond with the result later*/])
),
]);
let table = MapUnionHashMap::new_from([(key.table, row)]);
MapUnionHashMap::new_from([(key.namespace, table)])
})
-> reads;
-> null();
// -> map(|(key, addr) : (Key, Addr)| {
// let row = MapUnionHashMap::new_from([
// (
// key.row_key,
// SetUnionHashSet::new_from([addr /* to respond with the result later*/])
// ),
// ]);
// let table = MapUnionHashMap::new_from([(key.table, row)]);
// MapUnionHashMap::new_from([(key.namespace, table)])
// })
// -> reads;

client_in[Set]
-> inspect(|request| trace!("{:?}: Received Set request: {:?}.", context.current_tick(), request))
-> map(|(key, value, _addr) : (Key, String, Addr)| upsert_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key, value))
-> map(|(key, value, _addr) : (u64, String, Addr)| upsert_row(Clock::new(context.current_tick().0), key, value))
-> inspect(|_| {
SETS_COUNTER.inc(); // Bump SET metrics
})
-> writes;

client_in[Delete]
-> inspect(|req| trace!("{:?}: Received Delete request: {:?}.", context.current_tick(), req))
-> map(|(key, _addr) : (Key, Addr)| delete_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key))
-> writes;
-> null();
// -> inspect(|req| trace!("{:?}: Received Delete request: {:?}.", context.current_tick(), req))
// -> map(|(key, _addr) : (Key, Addr)| delete_row(Clock::new(context.current_tick().0), key.namespace, key.table, key.row_key))
// -> writes;

gossip_in = source_stream(gossip_inputs)
-> map(|(msg, addr)| GossipRequestWithAddress::from_request_and_address(msg, addr))
Expand Down Expand Up @@ -233,43 +235,43 @@ where
namespaces = state::<'static, Namespaces::<Clock>>();
new_writes = namespaces -> tee(); // TODO: Use the output from here to generate NACKs / ACKs

reads = state::<'tick, MapUnionHashMap<Namespace, MapUnionHashMap<TableName, MapUnionHashMap<RowKey, SetUnionHashSet<Addr>>>>>();

new_writes -> [0]process_system_table_reads;
reads -> [1]process_system_table_reads;

process_system_table_reads = lattice_bimorphism(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(PairBimorphism))), #namespaces, #reads)
-> lattice_reduce::<'tick>() // TODO: This can be removed if we fix https://github.com/hydro-project/hydroflow/issues/1401. Otherwise the result can be returned twice if get & gossip arrive in the same tick.
-> flat_map(|result: NamespaceMap<Pair<RowValue<Clock>, SetUnion<HashSet<Addr>>>>| {

let mut response: Vec<(ClientResponse, Addr)> = vec![];

let result = result.as_reveal_ref();

for (namespace, tables) in result.iter() {
for (table_name, table) in tables.as_reveal_ref().iter() {
for (row_key, join_results) in table.as_reveal_ref().iter() {
let key = Key {
namespace: *namespace,
table: table_name.clone(),
row_key: row_key.clone(),
};
// reads = state::<'tick, MapUnionHashMap<Namespace, MapUnionHashMap<TableName, MapUnionHashMap<RowKey, SetUnionHashSet<Addr>>>>>();

let timestamped_values = join_results.as_reveal_ref().0;
let all_values = timestamped_values.as_reveal_ref().1.as_reveal_ref();

let all_addresses = join_results.as_reveal_ref().1.as_reveal_ref();
let socket_addr = all_addresses.iter().find_or_first(|_| true).unwrap();

response.push((
ClientResponse::Get {key, value: all_values.clone()},
socket_addr.clone(),
));
}
}
}
response
}) -> client_out;
// new_writes -> [0]process_system_table_reads;
// reads -> [1]process_system_table_reads;
//
// process_system_table_reads = lattice_bimorphism(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(KeyedBimorphism::<HashMap<_, _>, _>::new(PairBimorphism))), #namespaces, #reads)
// -> lattice_reduce::<'tick>() // TODO: This can be removed if we fix https://github.com/hydro-project/hydroflow/issues/1401. Otherwise the result can be returned twice if get & gossip arrive in the same tick.
// -> flat_map(|result: NamespaceMap<Pair<RowValue<Clock>, SetUnion<HashSet<Addr>>>>| {
//
// let mut response: Vec<(ClientResponse, Addr)> = vec![];
//
// let result = result.as_reveal_ref();
//
// for (namespace, tables) in result.iter() {
// for (table_name, table) in tables.as_reveal_ref().iter() {
// for (row_key, join_results) in table.as_reveal_ref().iter() {
// let key = Key {
// namespace: *namespace,
// table: table_name.clone(),
// row_key: row_key.clone(),
// };
//
// let timestamped_values = join_results.as_reveal_ref().0;
// let all_values = timestamped_values.as_reveal_ref().1.as_reveal_ref();
//
// let all_addresses = join_results.as_reveal_ref().1.as_reveal_ref();
// let socket_addr = all_addresses.iter().find_or_first(|_| true).unwrap();
//
// response.push((
// ClientResponse::Get {key, value: all_values.clone()},
// socket_addr.clone(),
// ));
// }
// }
// }
// response
// }) -> client_out;

// new_writes -> for_each(|x| trace!("NEW WRITE: {:?}", x));

Expand Down
2 changes: 1 addition & 1 deletion datastores/gossip_kv/kv/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub enum ClientRequestWithAddress<A> {
/// A get request with the key and the address of the client.
Get { key: Key, addr: A },
/// A set request with the key, value and the address of the client.
Set { key: Key, value: String, addr: A },
Set { key: u64, value: String, addr: A },
/// A delete request with the key and the address of the client.
Delete { key: Key, addr: A },
}
Expand Down
4 changes: 2 additions & 2 deletions datastores/gossip_kv/load_test_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ fn run_server(

let local = task::LocalSet::new();
local.spawn_local(async move {
let key_master: Key = "/usr/table/key".parse().unwrap();
// let key_master: Key = "/usr/table/key".parse().unwrap();
loop {
let request = ClientRequest::Set {
key: key_master.clone(),
key: 100,
value: "FOOBAR".to_string(),
};
client_input_tx.send((request, UNKNOWN_ADDRESS)).await.unwrap();
Expand Down

0 comments on commit 2d72685

Please sign in to comment.