1
+ use core:: result:: Result :: { Err , Ok } ;
1
2
use std:: io:: Cursor ;
2
3
use std:: net:: SocketAddr ;
3
4
use std:: sync:: Arc ;
4
5
use std:: time:: Duration ;
5
6
7
+ use anyhow:: { anyhow, Context , Result } ;
6
8
use aquatic_udp_protocol:: { ConnectRequest , Request , Response , TransactionId } ;
7
9
use log:: debug;
8
10
use tokio:: net:: UdpSocket ;
@@ -25,99 +27,120 @@ pub struct UdpClient {
25
27
}
26
28
27
29
impl UdpClient {
28
- /// # Panics
30
+ /// # Errors
31
+ ///
32
+ /// Will return error if the local address can't be bound.
29
33
///
30
- /// Will panic if the local address can't be bound.
31
- pub async fn bind ( local_address : & str ) -> Self {
32
- let valid_socket_addr = local_address
34
+ pub async fn bind ( local_address : & str ) -> Result < Self > {
35
+ let socket_addr = local_address
33
36
. parse :: < SocketAddr > ( )
34
- . unwrap_or_else ( |_| panic ! ( "{local_address} is not a valid socket address" ) ) ;
37
+ . context ( format ! ( "{local_address} is not a valid socket address" ) ) ? ;
35
38
36
- let socket = UdpSocket :: bind ( valid_socket_addr ) . await . unwrap ( ) ;
39
+ let socket = UdpSocket :: bind ( socket_addr ) . await ? ;
37
40
38
- Self {
41
+ let udp_client = Self {
39
42
socket : Arc :: new ( socket) ,
40
43
timeout : DEFAULT_TIMEOUT ,
41
- }
44
+ } ;
45
+ Ok ( udp_client)
42
46
}
43
47
44
- /// # Panics
48
+ /// # Errors
45
49
///
46
- /// Will panic if can't connect to the socket.
47
- pub async fn connect ( & self , remote_address : & str ) {
48
- let valid_socket_addr = remote_address
50
+ /// Will return error if can't connect to the socket.
51
+ pub async fn connect ( & self , remote_address : & str ) -> Result < ( ) > {
52
+ let socket_addr = remote_address
49
53
. parse :: < SocketAddr > ( )
50
- . unwrap_or_else ( |_| panic ! ( "{remote_address} is not a valid socket address" ) ) ;
54
+ . context ( format ! ( "{remote_address} is not a valid socket address" ) ) ? ;
51
55
52
- match self . socket . connect ( valid_socket_addr) . await {
53
- Ok ( ( ) ) => debug ! ( "Connected successfully" ) ,
54
- Err ( e) => panic ! ( "Failed to connect: {e:?}" ) ,
56
+ match self . socket . connect ( socket_addr) . await {
57
+ Ok ( ( ) ) => {
58
+ debug ! ( "Connected successfully" ) ;
59
+ Ok ( ( ) )
60
+ }
61
+ Err ( e) => Err ( anyhow ! ( "Failed to connect: {e:?}" ) ) ,
55
62
}
56
63
}
57
64
58
- /// # Panics
65
+ /// # Errors
59
66
///
60
- /// Will panic if:
67
+ /// Will return error if:
61
68
///
62
69
/// - Can't write to the socket.
63
70
/// - Can't send data.
64
- pub async fn send ( & self , bytes : & [ u8 ] ) -> usize {
71
+ pub async fn send ( & self , bytes : & [ u8 ] ) -> Result < usize > {
65
72
debug ! ( target: "UDP client" , "sending {bytes:?} ..." ) ;
66
73
67
74
match time:: timeout ( self . timeout , self . socket . writable ( ) ) . await {
68
- Ok ( writable_result) => match writable_result {
69
- Ok ( ( ) ) => ( ) ,
70
- Err ( e) => panic ! ( "{}" , format!( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
71
- } ,
72
- Err ( e) => panic ! ( "{}" , format!( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
75
+ Ok ( writable_result) => {
76
+ match writable_result {
77
+ Ok ( ( ) ) => ( ) ,
78
+ Err ( e) => return Err ( anyhow ! ( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
79
+ } ;
80
+ }
81
+ Err ( e) => return Err ( anyhow ! ( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
73
82
} ;
74
83
75
84
match time:: timeout ( self . timeout , self . socket . send ( bytes) ) . await {
76
85
Ok ( send_result) => match send_result {
77
- Ok ( size) => size,
78
- Err ( e) => panic ! ( "{}" , format !( "IO error during send: {e:?}" ) ) ,
86
+ Ok ( size) => Ok ( size) ,
87
+ Err ( e) => Err ( anyhow ! ( "IO error during send: {e:?}" ) ) ,
79
88
} ,
80
- Err ( e) => panic ! ( "{}" , format !( "Send operation timed out: {e:?}" ) ) ,
89
+ Err ( e) => Err ( anyhow ! ( "Send operation timed out: {e:?}" ) ) ,
81
90
}
82
91
}
83
92
84
- /// # Panics
93
+ /// # Errors
85
94
///
86
- /// Will panic if:
95
+ /// Will return error if:
87
96
///
88
97
/// - Can't read from the socket.
89
98
/// - Can't receive data.
90
- pub async fn receive ( & self , bytes : & mut [ u8 ] ) -> usize {
99
+ ///
100
+ /// # Panics
101
+ ///
102
+ pub async fn receive ( & self , bytes : & mut [ u8 ] ) -> Result < usize > {
91
103
debug ! ( target: "UDP client" , "receiving ..." ) ;
92
104
93
105
match time:: timeout ( self . timeout , self . socket . readable ( ) ) . await {
94
- Ok ( readable_result) => match readable_result {
95
- Ok ( ( ) ) => ( ) ,
96
- Err ( e) => panic ! ( "{}" , format!( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
97
- } ,
98
- Err ( e) => panic ! ( "{}" , format!( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
106
+ Ok ( readable_result) => {
107
+ match readable_result {
108
+ Ok ( ( ) ) => ( ) ,
109
+ Err ( e) => return Err ( anyhow ! ( "IO error waiting for the socket to become readable: {e:?}" ) ) ,
110
+ } ;
111
+ }
112
+ Err ( e) => return Err ( anyhow ! ( "Timeout waiting for the socket to become readable: {e:?}" ) ) ,
99
113
} ;
100
114
101
- let size = match time:: timeout ( self . timeout , self . socket . recv ( bytes) ) . await {
115
+ let size_result = match time:: timeout ( self . timeout , self . socket . recv ( bytes) ) . await {
102
116
Ok ( recv_result) => match recv_result {
103
- Ok ( size) => size,
104
- Err ( e) => panic ! ( "{}" , format !( "IO error during send: {e:?}" ) ) ,
117
+ Ok ( size) => Ok ( size) ,
118
+ Err ( e) => Err ( anyhow ! ( "IO error during send: {e:?}" ) ) ,
105
119
} ,
106
- Err ( e) => panic ! ( "{}" , format !( "Receive operation timed out: {e:?}" ) ) ,
120
+ Err ( e) => Err ( anyhow ! ( "Receive operation timed out: {e:?}" ) ) ,
107
121
} ;
108
122
109
- debug ! ( target: "UDP client" , "{size} bytes received {bytes:?}" ) ;
110
-
111
- size
123
+ if size_result. is_ok ( ) {
124
+ let size = size_result. as_ref ( ) . unwrap ( ) ;
125
+ debug ! ( target: "UDP client" , "{size} bytes received {bytes:?}" ) ;
126
+ size_result
127
+ } else {
128
+ size_result
129
+ }
112
130
}
113
131
}
114
132
115
133
/// Creates a new `UdpClient` connected to a Udp server
116
- pub async fn new_udp_client_connected ( remote_address : & str ) -> UdpClient {
134
+ ///
135
+ /// # Errors
136
+ ///
137
+ /// Will return any errors present in the call stack
138
+ ///
139
+ pub async fn new_udp_client_connected ( remote_address : & str ) -> Result < UdpClient > {
117
140
let port = 0 ; // Let OS choose an unused port.
118
- let client = UdpClient :: bind ( & source_address ( port) ) . await ;
119
- client. connect ( remote_address) . await ;
120
- client
141
+ let client = UdpClient :: bind ( & source_address ( port) ) . await ? ;
142
+ client. connect ( remote_address) . await ? ;
143
+ Ok ( client)
121
144
}
122
145
123
146
#[ allow( clippy:: module_name_repetitions) ]
@@ -127,85 +150,103 @@ pub struct UdpTrackerClient {
127
150
}
128
151
129
152
impl UdpTrackerClient {
130
- /// # Panics
153
+ /// # Errors
131
154
///
132
- /// Will panic if can't write request to bytes.
133
- pub async fn send ( & self , request : Request ) -> usize {
155
+ /// Will return error if can't write request to bytes.
156
+ pub async fn send ( & self , request : Request ) -> Result < usize > {
134
157
debug ! ( target: "UDP tracker client" , "send request {request:?}" ) ;
135
158
136
159
// Write request into a buffer
137
160
let request_buffer = vec ! [ 0u8 ; MAX_PACKET_SIZE ] ;
138
161
let mut cursor = Cursor :: new ( request_buffer) ;
139
162
140
- let request_data = match request. write ( & mut cursor) {
163
+ let request_data_result = match request. write ( & mut cursor) {
141
164
Ok ( ( ) ) => {
142
165
#[ allow( clippy:: cast_possible_truncation) ]
143
166
let position = cursor. position ( ) as usize ;
144
167
let inner_request_buffer = cursor. get_ref ( ) ;
145
168
// Return slice which contains written request data
146
- & inner_request_buffer[ ..position]
169
+ Ok ( & inner_request_buffer[ ..position] )
147
170
}
148
- Err ( e) => panic ! ( "could not write request to bytes: {e}." ) ,
171
+ Err ( e) => Err ( anyhow ! ( "could not write request to bytes: {e}." ) ) ,
149
172
} ;
150
173
174
+ let request_data = request_data_result?;
175
+
151
176
self . udp_client . send ( request_data) . await
152
177
}
153
178
154
- /// # Panics
179
+ /// # Errors
155
180
///
156
- /// Will panic if can't create response from the received payload (bytes buffer).
157
- pub async fn receive ( & self ) -> Response {
181
+ /// Will return error if can't create response from the received payload (bytes buffer).
182
+ pub async fn receive ( & self ) -> Result < Response > {
158
183
let mut response_buffer = [ 0u8 ; MAX_PACKET_SIZE ] ;
159
184
160
- let payload_size = self . udp_client . receive ( & mut response_buffer) . await ;
185
+ let payload_size = self . udp_client . receive ( & mut response_buffer) . await ? ;
161
186
162
187
debug ! ( target: "UDP tracker client" , "received {payload_size} bytes. Response {response_buffer:?}" ) ;
163
188
164
- Response :: from_bytes ( & response_buffer[ ..payload_size] , true ) . unwrap ( )
189
+ let response = Response :: from_bytes ( & response_buffer[ ..payload_size] , true ) ?;
190
+
191
+ Ok ( response)
165
192
}
166
193
}
167
194
168
195
/// Creates a new `UdpTrackerClient` connected to a Udp Tracker server
169
- pub async fn new_udp_tracker_client_connected ( remote_address : & str ) -> UdpTrackerClient {
170
- let udp_client = new_udp_client_connected ( remote_address) . await ;
171
- UdpTrackerClient { udp_client }
196
+ ///
197
+ /// # Errors
198
+ ///
199
+ /// Will return any errors present in the call stack
200
+ ///
201
+ pub async fn new_udp_tracker_client_connected ( remote_address : & str ) -> Result < UdpTrackerClient > {
202
+ let udp_client = new_udp_client_connected ( remote_address) . await ?;
203
+ let udp_tracker_client = UdpTrackerClient { udp_client } ;
204
+ Ok ( udp_tracker_client)
172
205
}
173
206
174
207
/// Helper Function to Check if a UDP Service is Connectable
175
208
///
176
- /// # Errors
209
+ /// # Panics
177
210
///
178
211
/// It will return an error if unable to connect to the UDP service.
179
212
///
180
- /// # Panics
213
+ /// # Errors
214
+ ///
181
215
pub async fn check ( binding : & SocketAddr ) -> Result < String , String > {
182
216
debug ! ( "Checking Service (detail): {binding:?}." ) ;
183
217
184
- let client = new_udp_tracker_client_connected ( binding. to_string ( ) . as_str ( ) ) . await ;
185
-
186
- let connect_request = ConnectRequest {
187
- transaction_id : TransactionId ( 123 ) ,
188
- } ;
189
-
190
- client. send ( connect_request. into ( ) ) . await ;
191
-
192
- let process = move |response| {
193
- if matches ! ( response, Response :: Connect ( _connect_response) ) {
194
- Ok ( "Connected" . to_string ( ) )
195
- } else {
196
- Err ( "Did not Connect" . to_string ( ) )
197
- }
198
- } ;
199
-
200
- let sleep = time:: sleep ( Duration :: from_millis ( 2000 ) ) ;
201
- tokio:: pin!( sleep) ;
202
-
203
- tokio:: select! {
204
- ( ) = & mut sleep => {
205
- Err ( "Timed Out" . to_string( ) )
206
- }
207
- response = client. receive( ) => {
208
- process( response)
218
+ match new_udp_tracker_client_connected ( binding. to_string ( ) . as_str ( ) ) . await {
219
+ Ok ( client) => {
220
+ let connect_request = ConnectRequest {
221
+ transaction_id : TransactionId ( 123 ) ,
222
+ } ;
223
+
224
+ // client.send() return usize, but doesn't use here
225
+ match client. send ( connect_request. into ( ) ) . await {
226
+ Ok ( _) => ( ) ,
227
+ Err ( e) => debug ! ( "Error: {e:?}." ) ,
228
+ } ;
229
+
230
+ let process = move |response| {
231
+ if matches ! ( response, Response :: Connect ( _connect_response) ) {
232
+ Ok ( "Connected" . to_string ( ) )
233
+ } else {
234
+ Err ( "Did not Connect" . to_string ( ) )
235
+ }
236
+ } ;
237
+
238
+ let sleep = time:: sleep ( Duration :: from_millis ( 2000 ) ) ;
239
+ tokio:: pin!( sleep) ;
240
+
241
+ tokio:: select! {
242
+ ( ) = & mut sleep => {
243
+ Err ( "Timed Out" . to_string( ) )
244
+ }
245
+ response = client. receive( ) => {
246
+ process( response. unwrap( ) )
247
+ }
248
+ }
209
249
}
250
+ Err ( e) => Err ( format ! ( "{e:?}" ) ) ,
210
251
}
211
252
}
0 commit comments