diff --git a/crates/beet_net/src/pubsub/publisher.rs b/crates/beet_net/src/pubsub/publisher.rs index 8b0b6148..98669b68 100644 --- a/crates/beet_net/src/pubsub/publisher.rs +++ b/crates/beet_net/src/pubsub/publisher.rs @@ -27,6 +27,7 @@ impl Publisher { phantom: PhantomData, } } + pub fn recast(self) -> Publisher { let Publisher { topic, diff --git a/crates/beet_net/src/relay/relay_pubsub.rs b/crates/beet_net/src/relay/relay_pubsub.rs index d00d29ba..4f6e5fa2 100644 --- a/crates/beet_net/src/relay/relay_pubsub.rs +++ b/crates/beet_net/src/relay/relay_pubsub.rs @@ -8,6 +8,17 @@ use std::sync::Arc; impl Relay { /// Create a publisher for a topic pub fn add_publisher( + &self, + address: impl Into, + method: TopicMethod, + ) -> Result> { + self.add_publisher_with_topic(Topic::new( + address, + TopicScheme::PubSub, + method, + )) + } + pub fn add_publisher_with_topic( &self, topic: impl Into, ) -> Result> { @@ -35,6 +46,18 @@ impl Relay { /// Create a subscriber for a topic pub fn add_subscriber( + &self, + address: impl Into, + method: TopicMethod, + ) -> Result> { + self.add_subscriber_with_topic(Topic::new( + address, + TopicScheme::PubSub, + method, + )) + } + /// Create a subscriber for a topic + pub fn add_subscriber_with_topic( &self, topic: impl Into, ) -> Result> { diff --git a/crates/beet_net/src/relay/relay_request.rs b/crates/beet_net/src/relay/relay_request.rs index 3fc2f9e7..8482d49a 100644 --- a/crates/beet_net/src/relay/relay_request.rs +++ b/crates/beet_net/src/relay/relay_request.rs @@ -16,8 +16,8 @@ impl Relay { let topic_res = Topic::new(address.clone(), TopicScheme::Response, method); - let req = self.add_subscriber::(topic_req)?.recast(); - let res = self.add_publisher::(topic_res)?.recast(); + let req = self.add_subscriber_with_topic::(topic_req)?.recast(); + let res = self.add_publisher_with_topic::(topic_res)?.recast(); Ok(Responder::new(req, res)) } @@ -32,8 +32,8 @@ impl Relay { let topic_res = Topic::new(address.clone(), TopicScheme::Response, method); - let req = self.add_publisher::(topic_req)?.recast(); - let res = self.add_subscriber::(topic_res)?.recast(); + let req = self.add_publisher_with_topic::(topic_req)?.recast(); + let res = self.add_subscriber_with_topic::(topic_res)?.recast(); Ok(Requester::new(req, res)) } } diff --git a/crates/beet_net/test/net_graph/endpoint.rs b/crates/beet_net/test/net_graph/endpoint.rs index aea04bc4..deeaee0b 100644 --- a/crates/beet_net/test/net_graph/endpoint.rs +++ b/crates/beet_net/test/net_graph/endpoint.rs @@ -4,12 +4,12 @@ use sweet::*; #[sweet_test(non_send, skip)] pub async fn calls_topic_added() -> Result<()> { - let topic = Topic::pubsub_update("foo/bar"); let relay = Relay::default(); let mut on_change = relay.topic_set_changed(); expect(on_change.try_recv_all()?.len()).to_be(0)?; - let _some_pub = relay.add_publisher::(&topic)?; + let _some_pub = + relay.add_publisher::("foo/bar", TopicMethod::Update)?; expect(on_change.try_recv_all()?.len()).to_be(1)?; Ok(()) diff --git a/crates/beet_net/test/net_graph/pubsub.rs b/crates/beet_net/test/net_graph/pubsub.rs index 573efce0..8e2585be 100644 --- a/crates/beet_net/test/net_graph/pubsub.rs +++ b/crates/beet_net/test/net_graph/pubsub.rs @@ -5,13 +5,13 @@ use sweet::*; pub fn pubsub_fail_cases() -> Result<()> { let relay = Relay::default(); let topic = Topic::pubsub_update("foo/bar"); - let _sub = relay.add_subscriber::(&topic)?; - let bad_sub = relay.add_subscriber::(&topic); + let _sub = relay.add_subscriber_with_topic::(&topic)?; + let bad_sub = relay.add_subscriber_with_topic::(&topic); let err_str = "Type mismatch for PubSub.Update/foo/bar:0\nexpected u8, received u32"; expect(bad_sub).to_be_err_str(err_str)?; - let bad_pub = relay.add_publisher::(&topic); + let bad_pub = relay.add_publisher_with_topic::(&topic); expect(bad_pub).to_be_err_str(err_str)?; Ok(()) } @@ -23,9 +23,9 @@ pub fn pubsub_fail_cases() -> Result<()> { pub async fn pubsub() -> Result<()> { let relay = Relay::default(); let topic = Topic::pubsub_update("foo/bar"); - let mut sub1 = relay.add_subscriber::(&topic)?; + let mut sub1 = relay.add_subscriber_with_topic::(&topic)?; // let mut sub2 = relay.add_subscriber::(&topic)?; - let publisher = relay.add_publisher::(&topic)?; + let publisher = relay.add_publisher_with_topic::(&topic)?; publisher.send(&8_u8)?; let out1 = sub1.recv()?; // let out2 = sub2.recv()?; @@ -51,9 +51,9 @@ pub async fn async_broadcast() -> Result<()> { pub async fn broadcast() -> Result<()> { let relay = Relay::default(); let topic = Topic::pubsub_update("foo/bar"); - let mut sub1 = relay.add_subscriber::(&topic)?; - let mut sub2 = relay.add_subscriber::(&topic)?; - let publisher = relay.add_publisher::(&topic)?; + let mut sub1 = relay.add_subscriber_with_topic::(&topic)?; + let mut sub2 = relay.add_subscriber_with_topic::(&topic)?; + let publisher = relay.add_publisher_with_topic::(&topic)?; publisher.send(&8_u8)?; let out1 = sub1.recv()?; let out2 = sub2.recv()?; diff --git a/crates/beet_net/test/net_graph/relay.rs b/crates/beet_net/test/net_graph/relay.rs index 2626e4db..af166028 100644 --- a/crates/beet_net/test/net_graph/relay.rs +++ b/crates/beet_net/test/net_graph/relay.rs @@ -9,7 +9,7 @@ pub async fn works() -> Result<()> { let relay = Relay::default(); - let _sub1 = relay.add_subscriber::(&topic)?; + let _sub1 = relay.add_subscriber_with_topic::(&topic)?; Ok(()) diff --git a/crates/beet_net/test/net_graph/two_relays.rs b/crates/beet_net/test/net_graph/two_relays.rs index fb7434bd..56989e61 100644 --- a/crates/beet_net/test/net_graph/two_relays.rs +++ b/crates/beet_net/test/net_graph/two_relays.rs @@ -9,7 +9,7 @@ async fn receives_topic_graph_changed() -> Result<()> { let topic = Topic::pubsub_update("foo/bar"); - let _pub1 = relay1.add_publisher::(topic)?; + let _pub1 = relay1.add_publisher_with_topic::(topic)?; let messages = relay1.get_all_messages()?; expect(messages.len()).to_be(1)?; @@ -22,8 +22,8 @@ async fn cross_boundary_topic_changed() -> Result<()> { let mut relay1 = Relay::default(); let mut relay2 = Relay::default(); - let pub1 = relay1.add_publisher::(&topic)?; - let mut sub2 = relay2.add_subscriber::(&topic)?; + let pub1 = relay1.add_publisher_with_topic::(&topic)?; + let mut sub2 = relay2.add_subscriber_with_topic::(&topic)?; pub1.send(&8)?; @@ -38,12 +38,12 @@ async fn cross_boundary_errors() -> Result<()> { let topic = Topic::pubsub_update("foo/bar"); let mut relay1 = Relay::default(); - let pub1 = relay1.add_publisher::(&topic)?; + let pub1 = relay1.add_publisher_with_topic::(&topic)?; let mut relay2 = Relay::default(); pub1.send(&8)?; - let mut sub2 = relay2.add_subscriber::(&topic)?; + let mut sub2 = relay2.add_subscriber_with_topic::(&topic)?; relay1.sync_local(&mut relay2).await?; expect(sub2.recv_default_timeout()).to_be_err_str( "Type mismatch for foo/bar:0\nexpected u32, received u8",