This repository has been archived by the owner on Feb 10, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 21
/
handshake.rs
188 lines (161 loc) · 7.11 KB
/
handshake.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
//! Using tokio-proto to get a request / response oriented client and server
//!
//! This example is similar to `echo_client_server`, however it also illustrates
//! how to implement a connection handshake before starting to accept requests.
//!
//! In this case, when a client connects to a server, it has to send the
//! following line: `You ready?`. Once the server is ready to accept requests,
//! it responds with: `Bring it!`. If the server wants to reject the client for
//! some reason, it responds with: `No! Go away!`. The client is then expected
//! to close the socket.
//!
//! To do this, we need to implement a `ClientLineProto` and a `ServerLineProto`
//! that handle the handshakes on the client and server side respectively.
extern crate tokio_line as line;
#[macro_use]
extern crate futures;
extern crate tokio_io;
extern crate tokio_core;
extern crate tokio_proto;
extern crate tokio_service;
extern crate service_fn;
use futures::future;
use futures::{Future, Stream, Sink};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Framed;
use tokio_core::reactor::Core;
use tokio_proto::{TcpClient, TcpServer};
use tokio_proto::pipeline::{ClientProto, ServerProto};
use tokio_service::{Service, NewService};
use service_fn::service_fn;
use std::{io, thread};
use std::net::SocketAddr;
use std::time::Duration;
struct ClientLineProto;
struct ServerLineProto;
/// Start a server, listening for connections on `addr`.
///
/// Similar to the `serve` function in `lib.rs`, however since we are changing
/// the protocol to include ping / pong, we will need to use the tokio-proto
/// builders directly
pub fn serve<T>(addr: SocketAddr, new_service: T)
where T: NewService<Request = String, Response = String, Error = io::Error> + Send + Sync + 'static,
{
// We want responses returned from the provided request handler to be well
// formed. The `Validate` wrapper ensures that all service instances are
// also wrapped with `Validate`.
let new_service = line::Validate::new(new_service);
// Use the tokio-proto TCP server builder, this will handle creating a
// reactor instance and other details needed to run a server.
TcpServer::new(ServerLineProto, addr)
.serve(new_service);
}
impl<T: AsyncRead + AsyncWrite + 'static> ServerProto<T> for ServerLineProto {
type Request = String;
type Response = String;
/// `Framed<T, LineCodec>` is the return value of `io.framed(LineCodec)`
type Transport = Framed<T, line::LineCodec>;
type BindTransport = Box<Future<Item = Self::Transport, Error = io::Error>>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
// Construct the line-based transport
let transport = io.framed(line::LineCodec);
// The handshake requires that the client sends `You ready?`, so wait to
// receive that line. If anything else is sent, error out the connection
let handshake = transport.into_future()
// If the transport errors out, we don't care about the transport
// anymore, so just keep the error
.map_err(|(e, _)| e)
.and_then(|(line, transport)| {
// A line has been received, check to see if it is the handshake
match line {
Some(ref msg) if msg == "You ready?" => {
println!("SERVER: received client handshake");
// Send back the acknowledgement
Box::new(transport.send("Bring it!".to_string())) as Self::BindTransport
}
_ => {
// The client sent an unexpected handshake, error out
// the connection
println!("SERVER: client handshake INVALID");
let err = io::Error::new(io::ErrorKind::Other, "invalid handshake");
Box::new(future::err(err)) as Self::BindTransport
}
}
});
Box::new(handshake)
}
}
impl<T: AsyncRead + AsyncWrite + 'static> ClientProto<T> for ClientLineProto {
type Request = String;
type Response = String;
/// `Framed<T, LineCodec>` is the return value of `io.framed(LineCodec)`
type Transport = Framed<T, line::LineCodec>;
type BindTransport = Box<Future<Item = Self::Transport, Error = io::Error>>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
// Construct the line-based transport
let transport = io.framed(line::LineCodec);
// Send the handshake frame to the server.
let handshake = transport.send("You ready?".to_string())
// Wait for a response from the server, if the transport errors out,
// we don't care about the transport handle anymore, just the error
.and_then(|transport| transport.into_future().map_err(|(e, _)| e))
.and_then(|(line, transport)| {
// The server sent back a line, check to see if it is the
// expected handshake line.
match line {
Some(ref msg) if msg == "Bring it!" => {
println!("CLIENT: received server handshake");
Ok(transport)
}
Some(ref msg) if msg == "No! Go away!" => {
// At this point, the server is at capacity. There are a
// few things that we could do. Set a backoff timer and
// try again in a bit. Or we could try a different
// remote server. However, we're just going to error out
// the connection.
println!("CLIENT: server is at capacity");
let err = io::Error::new(io::ErrorKind::Other, "server at capacity");
Err(err)
}
_ => {
println!("CLIENT: server handshake INVALID");
let err = io::Error::new(io::ErrorKind::Other, "invalid handshake");
Err(err)
}
}
});
Box::new(handshake)
}
}
pub fn main() {
let mut core = Core::new().unwrap();
// This brings up our server.
let addr = "127.0.0.1:12345".parse().unwrap();
thread::spawn(move || {
// Use our `serve` fn
serve(
addr,
|| {
Ok(service_fn(|msg| {
println!("SERVER: {:?}", msg);
Ok(msg)
}))
});
});
// A bit annoying, but we need to wait for the server to connect
thread::sleep(Duration::from_millis(100));
let handle = core.handle();
let client = TcpClient::new(ClientLineProto)
.connect(&addr, &handle)
.map(|client_service| line::Validate::new(client_service));
core.run(
client
.and_then(|client| {
client.call("Goodbye".to_string())
.and_then(|response| {
println!("CLIENT: {:?}", response);
Ok(())
})
})
).unwrap();
}