Skip to content

Commit

Permalink
patch
Browse files Browse the repository at this point in the history
  • Loading branch information
mrchantey committed Mar 6, 2024
1 parent 70f7ab7 commit 68f8566
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 20 deletions.
1 change: 1 addition & 0 deletions crates/beet_net/src/pubsub/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ impl<T: Payload> Publisher<T> {
phantom: PhantomData,
}
}

pub fn recast<U: Payload>(self) -> Publisher<U> {
let Publisher {
topic,
Expand Down
23 changes: 23 additions & 0 deletions crates/beet_net/src/relay/relay_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ use std::sync::Arc;
impl Relay {
/// Create a publisher for a topic
pub fn add_publisher<T: Payload>(
&self,
address: impl Into<TopicAddress>,
method: TopicMethod,
) -> Result<Publisher<T>> {
self.add_publisher_with_topic(Topic::new(
address,
TopicScheme::PubSub,
method,
))
}
pub fn add_publisher_with_topic<T: Payload>(
&self,
topic: impl Into<Topic>,
) -> Result<Publisher<T>> {
Expand Down Expand Up @@ -35,6 +46,18 @@ impl Relay {

/// Create a subscriber for a topic
pub fn add_subscriber<T: Payload>(
&self,
address: impl Into<TopicAddress>,
method: TopicMethod,
) -> Result<Subscriber<T>> {
self.add_subscriber_with_topic(Topic::new(
address,
TopicScheme::PubSub,
method,
))
}
/// Create a subscriber for a topic
pub fn add_subscriber_with_topic<T: Payload>(
&self,
topic: impl Into<Topic>,
) -> Result<Subscriber<T>> {
Expand Down
8 changes: 4 additions & 4 deletions crates/beet_net/src/relay/relay_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ impl Relay {
let topic_res =
Topic::new(address.clone(), TopicScheme::Response, method);

let req = self.add_subscriber::<Req>(topic_req)?.recast();
let res = self.add_publisher::<Res>(topic_res)?.recast();
let req = self.add_subscriber_with_topic::<Req>(topic_req)?.recast();
let res = self.add_publisher_with_topic::<Res>(topic_res)?.recast();
Ok(Responder::new(req, res))
}

Expand All @@ -32,8 +32,8 @@ impl Relay {
let topic_res =
Topic::new(address.clone(), TopicScheme::Response, method);

let req = self.add_publisher::<Req>(topic_req)?.recast();
let res = self.add_subscriber::<Res>(topic_res)?.recast();
let req = self.add_publisher_with_topic::<Req>(topic_req)?.recast();
let res = self.add_subscriber_with_topic::<Res>(topic_res)?.recast();
Ok(Requester::new(req, res))
}
}
4 changes: 2 additions & 2 deletions crates/beet_net/test/net_graph/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use sweet::*;

#[sweet_test(non_send, skip)]
pub async fn calls_topic_added() -> Result<()> {
let topic = Topic::pubsub_update("foo/bar");
let relay = Relay::default();

let mut on_change = relay.topic_set_changed();
expect(on_change.try_recv_all()?.len()).to_be(0)?;
let _some_pub = relay.add_publisher::<u8>(&topic)?;
let _some_pub =
relay.add_publisher::<u8>("foo/bar", TopicMethod::Update)?;
expect(on_change.try_recv_all()?.len()).to_be(1)?;

Ok(())
Expand Down
16 changes: 8 additions & 8 deletions crates/beet_net/test/net_graph/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use sweet::*;
pub fn pubsub_fail_cases() -> Result<()> {
let relay = Relay::default();
let topic = Topic::pubsub_update("foo/bar");
let _sub = relay.add_subscriber::<u8>(&topic)?;
let bad_sub = relay.add_subscriber::<u32>(&topic);
let _sub = relay.add_subscriber_with_topic::<u8>(&topic)?;
let bad_sub = relay.add_subscriber_with_topic::<u32>(&topic);

let err_str =
"Type mismatch for PubSub.Update/foo/bar:0\nexpected u8, received u32";
expect(bad_sub).to_be_err_str(err_str)?;
let bad_pub = relay.add_publisher::<u32>(&topic);
let bad_pub = relay.add_publisher_with_topic::<u32>(&topic);
expect(bad_pub).to_be_err_str(err_str)?;
Ok(())
}
Expand All @@ -23,9 +23,9 @@ pub fn pubsub_fail_cases() -> Result<()> {
pub async fn pubsub() -> Result<()> {
let relay = Relay::default();
let topic = Topic::pubsub_update("foo/bar");
let mut sub1 = relay.add_subscriber::<u8>(&topic)?;
let mut sub1 = relay.add_subscriber_with_topic::<u8>(&topic)?;
// let mut sub2 = relay.add_subscriber::<u8>(&topic)?;
let publisher = relay.add_publisher::<u8>(&topic)?;
let publisher = relay.add_publisher_with_topic::<u8>(&topic)?;
publisher.send(&8_u8)?;
let out1 = sub1.recv()?;
// let out2 = sub2.recv()?;
Expand All @@ -51,9 +51,9 @@ pub async fn async_broadcast() -> Result<()> {
pub async fn broadcast() -> Result<()> {
let relay = Relay::default();
let topic = Topic::pubsub_update("foo/bar");
let mut sub1 = relay.add_subscriber::<u8>(&topic)?;
let mut sub2 = relay.add_subscriber::<u8>(&topic)?;
let publisher = relay.add_publisher::<u8>(&topic)?;
let mut sub1 = relay.add_subscriber_with_topic::<u8>(&topic)?;
let mut sub2 = relay.add_subscriber_with_topic::<u8>(&topic)?;
let publisher = relay.add_publisher_with_topic::<u8>(&topic)?;
publisher.send(&8_u8)?;
let out1 = sub1.recv()?;
let out2 = sub2.recv()?;
Expand Down
2 changes: 1 addition & 1 deletion crates/beet_net/test/net_graph/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub async fn works() -> Result<()> {

let relay = Relay::default();

let _sub1 = relay.add_subscriber::<u8>(&topic)?;
let _sub1 = relay.add_subscriber_with_topic::<u8>(&topic)?;


Ok(())
Expand Down
10 changes: 5 additions & 5 deletions crates/beet_net/test/net_graph/two_relays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn receives_topic_graph_changed() -> Result<()> {

let topic = Topic::pubsub_update("foo/bar");

let _pub1 = relay1.add_publisher::<u32>(topic)?;
let _pub1 = relay1.add_publisher_with_topic::<u32>(topic)?;
let messages = relay1.get_all_messages()?;

expect(messages.len()).to_be(1)?;
Expand All @@ -22,8 +22,8 @@ async fn cross_boundary_topic_changed() -> Result<()> {
let mut relay1 = Relay::default();
let mut relay2 = Relay::default();

let pub1 = relay1.add_publisher::<u32>(&topic)?;
let mut sub2 = relay2.add_subscriber::<u32>(&topic)?;
let pub1 = relay1.add_publisher_with_topic::<u32>(&topic)?;
let mut sub2 = relay2.add_subscriber_with_topic::<u32>(&topic)?;

pub1.send(&8)?;

Expand All @@ -38,12 +38,12 @@ async fn cross_boundary_errors() -> Result<()> {
let topic = Topic::pubsub_update("foo/bar");

let mut relay1 = Relay::default();
let pub1 = relay1.add_publisher::<u32>(&topic)?;
let pub1 = relay1.add_publisher_with_topic::<u32>(&topic)?;

let mut relay2 = Relay::default();

pub1.send(&8)?;
let mut sub2 = relay2.add_subscriber::<u8>(&topic)?;
let mut sub2 = relay2.add_subscriber_with_topic::<u8>(&topic)?;
relay1.sync_local(&mut relay2).await?;
expect(sub2.recv_default_timeout()).to_be_err_str(
"Type mismatch for foo/bar:0\nexpected u32, received u8",
Expand Down

0 comments on commit 68f8566

Please sign in to comment.