Skip to content

Commit

Permalink
Add handlers for deserilized messages
Browse files Browse the repository at this point in the history
Each Sv2 protocol have a server and client trait. This trait have a
method that handle serilaized messages. This commit add a method that
handle deserilized messages, and modify the method for serilaized so
that it just deserilize the message and call the new method. This can be
usefull if we have more roles managed by the same rust process, to avoid
some serailization and deserialization. It also bump the minor version
of the roles-logic-sv2 lib cause it add a feature and do not breack and
API.
  • Loading branch information
UnidenifiedUser committed Mar 2, 2024
1 parent a49588d commit b6ef686
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 90 deletions.
4 changes: 2 additions & 2 deletions protocols/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion protocols/v2/roles-logic-sv2/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "roles_logic_sv2"
version = "0.1.1"
version = "0.1.2"
edition = "2018"
description = "Common handlers for use within SV2 roles"
license = "MIT OR Apache-2.0"
Expand Down
43 changes: 39 additions & 4 deletions protocols/v2/roles-logic-sv2/src/handlers/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
use common_messages_sv2::{
ChannelEndpointChanged, SetupConnection, SetupConnectionError, SetupConnectionSuccess,
};
use const_sv2::*;
use core::convert::TryInto;
use std::sync::Arc;
use tracing::{debug, error, info, trace};
Expand All @@ -33,9 +34,26 @@ where
self_: Arc<Mutex<Self>>,
message_type: u8,
payload: &mut [u8],
routing_logic: CommonRoutingLogic<Router>,
) -> Result<SendTo, Error> {
Self::handle_message_common_deserilized(
self_,
(message_type, payload).try_into(),
routing_logic,
)
}
/// Takes a message and it calls the appropriate handler function
///
/// Arguments:
///
/// * `message_type`: See [`const_sv2`].
///
fn handle_message_common_deserilized(
self_: Arc<Mutex<Self>>,
message: Result<CommonMessages<'_>, Error>,
_routing_logic: CommonRoutingLogic<Router>,
) -> Result<SendTo, Error> {
match (message_type, payload).try_into() {
match message {
Ok(CommonMessages::SetupConnectionSuccess(m)) => {
info!(
"Received SetupConnectionSuccess: version={}, flags={:b}",
Expand Down Expand Up @@ -63,7 +81,9 @@ where
.safe_lock(|x| x.handle_channel_endpoint_changed(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?
}
Ok(CommonMessages::SetupConnection(_)) => Err(Error::UnexpectedMessage(message_type)),
Ok(CommonMessages::SetupConnection(_)) => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_SETUP_CONNECTION))
}
Err(e) => Err(e),
}
}
Expand Down Expand Up @@ -106,7 +126,6 @@ where
Err(e) => Err(e),
}
}

/// It takes a message type and a payload, and if the message is a serialized setup connection
/// message, it calls the `on_setup_connection` function on the routing logic, and then calls the
/// `handle_setup_connection` function on the router
Expand All @@ -121,7 +140,23 @@ where
payload: &mut [u8],
routing_logic: CommonRoutingLogic<Router>,
) -> Result<SendTo, Error> {
match (message_type, payload).try_into() {
Self::handle_message_common_deserilized(
self_,
(message_type, payload).try_into(),
routing_logic,
)
}

/// It takes a message do setup connection message, it calls
/// the `on_setup_connection` function on the routing logic, and then calls the
/// `handle_setup_connection` function on the router
///
fn handle_message_common_deserilized(
self_: Arc<Mutex<Self>>,
message: Result<CommonMessages<'_>, Error>,
routing_logic: CommonRoutingLogic<Router>,
) -> Result<SendTo, Error> {
match message {
Ok(CommonMessages::SetupConnection(m)) => {
info!(
"Received SetupConnection: version={}, flags={:b}",
Expand Down
18 changes: 16 additions & 2 deletions protocols/v2/roles-logic-sv2/src/handlers/job_declaration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ where
message_type: u8,
payload: &mut [u8],
) -> Result<SendTo, Error> {
match (message_type, payload).try_into() {
Self::handle_message_job_declaration_deserialized(self_, (message_type, payload).try_into())
}

fn handle_message_job_declaration_deserialized(
self_: Arc<Mutex<Self>>,
message: Result<JobDeclaration<'_>, Error>,
) -> Result<SendTo, Error> {
match message {
Ok(JobDeclaration::AllocateMiningJobTokenSuccess(message)) => {
debug!(
"Received AllocateMiningJobTokenSuccess with id: {}",
Expand Down Expand Up @@ -117,7 +124,14 @@ where
message_type: u8,
payload: &mut [u8],
) -> Result<SendTo, Error> {
match (message_type, payload).try_into() {
Self::handle_message_job_declaration_deserialized(self_, (message_type, payload).try_into())
}

fn handle_message_job_declaration_deserialized(
self_: Arc<Mutex<Self>>,
message: Result<JobDeclaration<'_>, Error>,
) -> Result<SendTo, Error> {
match message {
Ok(JobDeclaration::AllocateMiningJobToken(message)) => {
debug!(
"Received AllocateMiningJobToken with id: {}",
Expand Down
111 changes: 89 additions & 22 deletions protocols/v2/roles-logic-sv2/src/handlers/mining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
use super::SendTo_;

use crate::utils::Mutex;
use const_sv2::*;
use std::{fmt::Debug as D, sync::Arc};
use tracing::{debug, error, info, trace};

Expand Down Expand Up @@ -49,6 +50,25 @@ pub trait ParseDownstreamMiningMessages<
payload: &mut [u8],
routing_logic: MiningRoutingLogic<Self, Up, Selector, Router>,
) -> Result<SendTo<Up>, Error>
where
Self: IsMiningDownstream + Sized,
{
match Self::handle_message_mining_deserialized(
self_mutex,
(message_type, payload).try_into(),
routing_logic,
) {
Err(Error::UnexpectedMessage(0)) => Err(Error::UnexpectedMessage(message_type)),
result => result,
}
}

/// Used to route SV2 mining messages from the downstream
fn handle_message_mining_deserialized(
self_mutex: Arc<Mutex<Self>>,
message: Result<Mining<'_>, Error>,
routing_logic: MiningRoutingLogic<Self, Up, Selector, Router>,
) -> Result<SendTo<Up>, Error>
where
Self: IsMiningDownstream + Sized,
{
Expand All @@ -61,7 +81,7 @@ pub trait ParseDownstreamMiningMessages<
)
})
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?;
match (message_type, payload).try_into() {
match message {
Ok(Mining::OpenStandardMiningChannel(mut m)) => {
info!(
"Received OpenStandardMiningChannel from: {} with id: {}",
Expand Down Expand Up @@ -106,7 +126,9 @@ pub trait ParseDownstreamMiningMessages<
SupportedChannelTypes::Standard => self_mutex
.safe_lock(|self_| self_.handle_open_standard_mining_channel(m, upstream))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_STANDARD_MINING_CHANNEL,
)),
SupportedChannelTypes::Group => self_mutex
.safe_lock(|self_| self_.handle_open_standard_mining_channel(m, upstream))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand Down Expand Up @@ -137,11 +159,15 @@ pub trait ParseDownstreamMiningMessages<
channel_type
);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL,
)),
SupportedChannelTypes::Extended => self_mutex
.safe_lock(|self_| self_.handle_open_extended_mining_channel(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL,
)),
SupportedChannelTypes::GroupAndExtended => self_mutex
.safe_lock(|self_| self_.handle_open_extended_mining_channel(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand Down Expand Up @@ -181,7 +207,9 @@ pub trait ParseDownstreamMiningMessages<
.safe_lock(|self_| self_.handle_submit_shares_standard(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?
}
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SUBMIT_SHARES_STANDARD,
)),
SupportedChannelTypes::Group => {
debug!("Received SubmitSharesStandard->Group message");
trace!("SubmitSharesStandard {:?}", m);
Expand All @@ -201,11 +229,15 @@ pub trait ParseDownstreamMiningMessages<
debug!("Received SubmitSharesExtended message");
trace!("SubmitSharesExtended {:?}", m);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SUBMIT_SHARES_EXTENDED,
)),
SupportedChannelTypes::Extended => self_mutex
.safe_lock(|self_| self_.handle_submit_shares_extended(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SUBMIT_SHARES_EXTENDED,
)),
SupportedChannelTypes::GroupAndExtended => self_mutex
.safe_lock(|self_| self_.handle_submit_shares_extended(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand All @@ -224,10 +256,10 @@ pub trait ParseDownstreamMiningMessages<
(SupportedChannelTypes::GroupAndExtended, true) => self_mutex
.safe_lock(|self_| self_.handle_set_custom_mining_job(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
_ => Err(Error::UnexpectedMessage(message_type)),
_ => Err(Error::UnexpectedMessage(MESSAGE_TYPE_SET_CUSTOM_MINING_JOB)),
}
}
Ok(_) => Err(Error::UnexpectedMessage(message_type)),
Ok(_) => Err(Error::UnexpectedMessage(0)),
Err(e) => Err(e),
}
}
Expand Down Expand Up @@ -290,12 +322,27 @@ pub trait ParseUpstreamMiningMessages<
message_type: u8,
payload: &mut [u8],
routing_logic: MiningRoutingLogic<Down, Self, Selector, Router>,
) -> Result<SendTo<Down>, Error> {
match Self::handle_message_mining_deserialized(
self_mutex,
(message_type, payload).try_into(),
routing_logic,
) {
Err(Error::UnexpectedMessage(0)) => Err(Error::UnexpectedMessage(message_type)),
result => result,
}
}

fn handle_message_mining_deserialized(
self_mutex: Arc<Mutex<Self>>,
message: Result<Mining, Error>,
routing_logic: MiningRoutingLogic<Down, Self, Selector, Router>,
) -> Result<SendTo<Down>, Error> {
let (channel_type, is_work_selection_enabled) = self_mutex
.safe_lock(|s| (s.get_channel_type(), s.is_work_selection_enabled()))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?;

match (message_type, payload).try_into() {
match message {
Ok(Mining::OpenStandardMiningChannelSuccess(mut m)) => {
let remote = match routing_logic {
MiningRoutingLogic::None => None,
Expand All @@ -313,7 +360,9 @@ pub trait ParseUpstreamMiningMessages<
SupportedChannelTypes::Standard => self_mutex
.safe_lock(|s| s.handle_open_standard_mining_channel_success(m, remote))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_STANDARD_MINING_CHANNEL_SUCCESS,
)),
SupportedChannelTypes::Group => self_mutex
.safe_lock(|s| s.handle_open_standard_mining_channel_success(m, remote))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand All @@ -326,11 +375,15 @@ pub trait ParseUpstreamMiningMessages<
info!("Received OpenExtendedMiningChannelSuccess with request id: {} and channel id: {}", m.request_id, m.channel_id);
debug!("OpenStandardMiningChannelSuccess: {:?}", m);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL_SUCCES,
)),
SupportedChannelTypes::Extended => self_mutex
.safe_lock(|s| s.handle_open_extended_mining_channel_success(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_OPEN_EXTENDED_MINING_CHANNEL_SUCCES,
)),
SupportedChannelTypes::GroupAndExtended => self_mutex
.safe_lock(|s| s.handle_open_extended_mining_channel_success(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand Down Expand Up @@ -469,18 +522,24 @@ pub trait ParseUpstreamMiningMessages<
SupportedChannelTypes::Standard => self_mutex
.safe_lock(|x| x.handle_new_mining_job(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Group => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_NEW_MINING_JOB))
}
SupportedChannelTypes::Group => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_NEW_MINING_JOB))
}
SupportedChannelTypes::GroupAndExtended => {
Err(Error::UnexpectedMessage(message_type))
Err(Error::UnexpectedMessage(MESSAGE_TYPE_NEW_MINING_JOB))
}
}
}
Ok(Mining::NewExtendedMiningJob(m)) => {
info!("Received new extended mining job for channel id: {} with job id: {} is_future: {}",m.channel_id, m.job_id, m.is_future());
debug!("NewExtendedMiningJob: {:?}", m);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_NEW_EXTENDED_MINING_JOB,
)),
SupportedChannelTypes::Extended => self_mutex
.safe_lock(|x| x.handle_new_extended_mining_job(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand Down Expand Up @@ -527,7 +586,9 @@ pub trait ParseUpstreamMiningMessages<
(SupportedChannelTypes::GroupAndExtended, true) => self_mutex
.safe_lock(|x| x.handle_set_custom_mining_job_success(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
_ => Err(Error::UnexpectedMessage(message_type)),
_ => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SET_CUSTOM_MINING_JOB_SUCCESS,
)),
}
}

Expand All @@ -546,7 +607,9 @@ pub trait ParseUpstreamMiningMessages<
(SupportedChannelTypes::GroupAndExtended, true) => self_mutex
.safe_lock(|x| x.handle_set_custom_mining_job_error(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
_ => Err(Error::UnexpectedMessage(message_type)),
_ => Err(Error::UnexpectedMessage(
MESSAGE_TYPE_SET_CUSTOM_MINING_JOB_ERROR,
)),
}
}
Ok(Mining::SetTarget(m)) => {
Expand Down Expand Up @@ -590,8 +653,12 @@ pub trait ParseUpstreamMiningMessages<
info!("Received SetGroupChannel");
debug!("SetGroupChannel: {:?}", m);
match channel_type {
SupportedChannelTypes::Standard => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Extended => Err(Error::UnexpectedMessage(message_type)),
SupportedChannelTypes::Standard => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_SET_GROUP_CHANNEL))
}
SupportedChannelTypes::Extended => {
Err(Error::UnexpectedMessage(MESSAGE_TYPE_SET_GROUP_CHANNEL))
}
SupportedChannelTypes::Group => self_mutex
.safe_lock(|x| x.handle_set_group_channel(m))
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
Expand All @@ -600,7 +667,7 @@ pub trait ParseUpstreamMiningMessages<
.map_err(|e| crate::Error::PoisonLock(e.to_string()))?,
}
}
Ok(_) => Err(Error::UnexpectedMessage(message_type)),
Ok(_) => Err(Error::UnexpectedMessage(0)),
Err(e) => Err(e),
}
}
Expand Down
Loading

0 comments on commit b6ef686

Please sign in to comment.