-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathoperations.rs
107 lines (95 loc) · 3.25 KB
/
operations.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crate::configuration::FailureMode;
use crate::filter::operations::Operation::SendGrpcRequest;
use crate::runtime_action_set::RuntimeActionSet;
use crate::service::{GrpcErrResponse, GrpcRequest, Headers, IndexedGrpcRequest};
use std::rc::Rc;
pub enum Operation {
SendGrpcRequest(GrpcMessageSenderOperation),
AwaitGrpcResponse(GrpcMessageReceiverOperation),
AddHeaders(HeadersOperation),
Die(GrpcErrResponse),
// Done indicates that we have no more operations and can resume the http request flow
Done(),
}
pub struct GrpcMessageSenderOperation {
runtime_action_set: Rc<RuntimeActionSet>,
grpc_request: IndexedGrpcRequest,
}
impl GrpcMessageSenderOperation {
pub fn new(
runtime_action_set: Rc<RuntimeActionSet>,
indexed_request: IndexedGrpcRequest,
) -> Self {
Self {
runtime_action_set,
grpc_request: indexed_request,
}
}
pub fn build_receiver_operation(self) -> (GrpcRequest, GrpcMessageReceiverOperation) {
let index = self.grpc_request.index();
(
self.grpc_request.request(),
GrpcMessageReceiverOperation::new(self.runtime_action_set, index),
)
}
}
pub struct GrpcMessageReceiverOperation {
runtime_action_set: Rc<RuntimeActionSet>,
current_index: usize,
}
impl GrpcMessageReceiverOperation {
pub fn new(runtime_action_set: Rc<RuntimeActionSet>, current_index: usize) -> Self {
Self {
runtime_action_set,
current_index,
}
}
pub fn digest_grpc_response(self, msg: &[u8]) -> Vec<Operation> {
let result = self
.runtime_action_set
.process_grpc_response(self.current_index, msg);
match result {
Ok((next_msg, headers)) => {
let mut operations = Vec::new();
if !headers.is_empty() {
operations.push(Operation::AddHeaders(HeadersOperation::new(headers)))
}
operations.push(match next_msg {
None => Operation::Done(),
Some(indexed_req) => SendGrpcRequest(GrpcMessageSenderOperation::new(
self.runtime_action_set,
indexed_req,
)),
});
operations
}
Err(grpc_err_resp) => vec![Operation::Die(grpc_err_resp)],
}
}
pub fn fail(self) -> Operation {
match self.runtime_action_set.runtime_actions[self.current_index].get_failure_mode() {
FailureMode::Deny => Operation::Die(GrpcErrResponse::new_internal_server_error()),
FailureMode::Allow => match self
.runtime_action_set
.find_next_grpc_request(self.current_index + 1)
{
None => Operation::Done(),
Some(indexed_req) => Operation::SendGrpcRequest(GrpcMessageSenderOperation::new(
self.runtime_action_set,
indexed_req,
)),
},
}
}
}
pub struct HeadersOperation {
headers: Headers,
}
impl HeadersOperation {
pub fn new(headers: Headers) -> Self {
Self { headers }
}
pub fn headers(self) -> Headers {
self.headers
}
}