-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.rs
121 lines (105 loc) · 3.55 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
mod dhi;
use dhi::{DHIRequest, DHIResponse};
mod errors;
use errors::AppError;
mod config;
use config::AppConfig;
use actix_web::{web, App, Error, HttpResponse, HttpServer};
use async_std::net::TcpStream;
use async_std::prelude::*;
use futures::StreamExt;
use serde_json::Value;
use serde_xml_rs::from_reader;
use std::path::PathBuf;
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
#[structopt(
name = "proust",
about = "Application for testing Bank credit card processing systems."
)]
struct Opt {
/// Configuration file
#[structopt(parse(from_os_str))]
config: PathBuf,
}
// TODO: Impl AppState
struct AppState {
host_stream: TcpStream,
}
/// Asynchronously exchange data with DHI host
async fn talk_to_dhi_host(data: web::Data<AppState>, msg: String) -> Result<DHIResponse, AppError> {
let mut s = &data.host_stream;
s.write_all(&msg.as_bytes()).await?;
println!("{}", msg);
let mut buffer = [0; 8192];
s.read(&mut buffer).await?;
// The first 5 bytes are the message length
let response: DHIResponse = from_reader(&buffer[5..])?;
Ok(response)
}
// TODO: write tests to cover all the unwrapping
async fn serve_dhi_request(
data: web::Data<AppState>,
mut body: web::Payload,
) -> Result<HttpResponse, Error> {
let body = body.next().await.unwrap()?;
let iso_data = String::from_utf8(body.to_vec()).unwrap();
let iso_obj: Value = serde_json::from_str(&iso_data).unwrap();
let r: DHIRequest = DHIRequest::new(iso_obj);
let msg = r.serialize().unwrap();
let res = talk_to_dhi_host(data, msg).await;
match res {
Ok(res) => Ok(HttpResponse::Ok()
.content_type("application/json")
.header("X-Hdr", "sample")
.body(res.serialize().unwrap())),
Err(err) => match err {
AppError::IoError(err) => {
println!("Error: {:?}", err);
Ok(HttpResponse::ServiceUnavailable()
.content_type("plain/text")
.body("Error communicating with DHI host"))
}
AppError::ParseError(err) => {
println!("Error: {:?}", err);
Ok(HttpResponse::InternalServerError()
.content_type("plain/text")
.body("Error processing data from DHI host"))
}
AppError::SerializeError(err) => {
println!("Error: {:?}", err);
Ok(HttpResponse::InternalServerError()
.content_type("plain/text")
.body("Serialization error"))
}
_ => {
println!("Error: {:?}", err);
Ok(HttpResponse::InternalServerError()
.content_type("plain/text")
.body("Internal error"))
}
},
}
}
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
let opt = Opt::from_args();
let cfg = AppConfig::new(opt.config.to_str().unwrap()).unwrap();
// TODO: iterate through channels
let dhi_host = &cfg.channels["dhi"]["host"].as_str().unwrap();
let app_state = web::Data::new(AppState {
host_stream: TcpStream::connect(dhi_host).await?,
});
app_state.host_stream.set_nodelay(true)?;
println!("Connected to {}", dhi_host);
HttpServer::new(move || {
App::new()
.app_data(app_state.clone())
.route("/dhi", web::post().to(serve_dhi_request))
})
.workers(cfg.get_num_of_workers())
.keep_alive(cfg.get_listener_keep_alive())
.bind(cfg.get_conn_str())?
.run()
.await
}