Skip to content

Commit 6046d6e

Browse files
Add documentation on HealthObserve trait (#5)
* Add documentation on HealthObserve trait Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update Cargo.toml Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update server_utils.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update test_upstream.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update proxy_cache.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Create rate_limiter.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update index.md Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Rename rate_limiter.rs to rate_limiter.md Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update key.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update lib.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update brotli.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update gzip.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update mod.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update client.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update raw_connect.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update health_check.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update lib.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update Cargo.toml Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Create rate_limiter.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update lib.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update raw_connect.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update lib.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Update gzip.rs Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Apply rustfmt changes (#1) * up (#2) * Apply rustfmt changes * Apply rustfmt changes --------- Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> Co-authored-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> * Dev (#3) * Apply rustfmt changes * Apply rustfmt changes * Apply rustfmt changes * git update --------- Signed-off-by: gitworkflows <118260833+gitworkflows@users.noreply.github.com> Co-authored-by: FortiShield <161459699+FortiShield@users.noreply.github.com>
1 parent 5a8a581 commit 6046d6e

File tree

20 files changed

+553
-55
lines changed

20 files changed

+553
-55
lines changed

bongonet-cache/src/key.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl CacheKey {
130130

131131
/// Storage optimized cache key to keep in memory or in storage
132132
// 16 bytes + 8 bytes (+16 * u8) + user_tag.len() + 16 Bytes (Box<str>)
133-
#[derive(Debug, Deserialize, Serialize, Clone, Hash, PartialEq, Eq)]
133+
#[derive(Debug, Deserialize, Serialize, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
134134
pub struct CompactCacheKey {
135135
pub primary: HashBinary,
136136
// save 8 bytes for non-variance but waste 8 bytes for variance vs, store flat 16 bytes

bongonet-cache/src/lib.rs

+19-3
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ pub enum CachePhase {
7777
Miss,
7878
/// A staled (expired) asset is found
7979
Stale,
80+
/// A staled (expired) asset was found, but another request is revalidating it
81+
StaleUpdating,
8082
/// A staled (expired) asset was found, so a fresh one was fetched
8183
Expired,
8284
/// A staled (expired) asset was found, and it was revalidated to be fresh
@@ -96,6 +98,7 @@ impl CachePhase {
9698
CachePhase::Hit => "hit",
9799
CachePhase::Miss => "miss",
98100
CachePhase::Stale => "stale",
101+
CachePhase::StaleUpdating => "stale-updating",
99102
CachePhase::Expired => "expired",
100103
CachePhase::Revalidated => "revalidated",
101104
CachePhase::RevalidatedNoCache(_) => "revalidated-nocache",
@@ -260,7 +263,7 @@ impl HttpCache {
260263
use CachePhase::*;
261264
match self.phase {
262265
Disabled(_) | Bypass | Miss | Expired | Revalidated | RevalidatedNoCache(_) => true,
263-
Hit | Stale => false,
266+
Hit | Stale | StaleUpdating => false,
264267
Uninit | CacheKey => false, // invalid states for this call, treat them as false to keep it simple
265268
}
266269
}
@@ -493,7 +496,8 @@ impl HttpCache {
493496
match self.phase {
494497
// from CacheKey: set state to miss during cache lookup
495498
// from Bypass: response became cacheable, set state to miss to cache
496-
CachePhase::CacheKey | CachePhase::Bypass => {
499+
// from Stale: waited for cache lock, then retried and found asset was gone
500+
CachePhase::CacheKey | CachePhase::Bypass | CachePhase::Stale => {
497501
self.phase = CachePhase::Miss;
498502
self.inner_mut().traces.start_miss_span();
499503
}
@@ -508,6 +512,7 @@ impl HttpCache {
508512
match self.phase {
509513
CachePhase::Hit
510514
| CachePhase::Stale
515+
| CachePhase::StaleUpdating
511516
| CachePhase::Revalidated
512517
| CachePhase::RevalidatedNoCache(_) => self.inner_mut().body_reader.as_mut().unwrap(),
513518
_ => panic!("wrong phase {:?}", self.phase),
@@ -543,6 +548,7 @@ impl HttpCache {
543548
| CachePhase::Miss
544549
| CachePhase::Expired
545550
| CachePhase::Stale
551+
| CachePhase::StaleUpdating
546552
| CachePhase::Revalidated
547553
| CachePhase::RevalidatedNoCache(_) => {
548554
let inner = self.inner_mut();
@@ -785,6 +791,14 @@ impl HttpCache {
785791
// TODO: remove this asset from cache once finished?
786792
}
787793

794+
/// Mark this asset as stale, but being updated separately from this request.
795+
pub fn set_stale_updating(&mut self) {
796+
match self.phase {
797+
CachePhase::Stale => self.phase = CachePhase::StaleUpdating,
798+
_ => panic!("wrong phase {:?}", self.phase),
799+
}
800+
}
801+
788802
/// Update the variance of the [CacheMeta].
789803
///
790804
/// Note that this process may change the lookup `key`, and eventually (when the asset is
@@ -853,6 +867,7 @@ impl HttpCache {
853867
match self.phase {
854868
// TODO: allow in Bypass phase?
855869
CachePhase::Stale
870+
| CachePhase::StaleUpdating
856871
| CachePhase::Expired
857872
| CachePhase::Hit
858873
| CachePhase::Revalidated
@@ -881,6 +896,7 @@ impl HttpCache {
881896
match self.phase {
882897
CachePhase::Miss
883898
| CachePhase::Stale
899+
| CachePhase::StaleUpdating
884900
| CachePhase::Expired
885901
| CachePhase::Hit
886902
| CachePhase::Revalidated
@@ -1005,7 +1021,7 @@ impl HttpCache {
10051021

10061022
/// Whether this request's cache hit is staled
10071023
fn has_staled_asset(&self) -> bool {
1008-
self.phase == CachePhase::Stale
1024+
matches!(self.phase, CachePhase::Stale | CachePhase::StaleUpdating)
10091025
}
10101026

10111027
/// Whether this asset is staled and stale if error is allowed

bongonet-core/src/protocols/http/compression/brotli.rs

-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ impl Decompressor {
4242

4343
impl Encode for Decompressor {
4444
fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
45-
// reserve at most 16k
4645
const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024;
4746
// Brotli compress ratio can be 3.5 to 4.5
4847
const ESTIMATED_COMPRESSION_RATIO: usize = 4;

bongonet-core/src/protocols/http/compression/gzip.rs

+85-4
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,65 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use super::Encode;
15+
use super::{Encode, COMPRESSION_ERROR};
1616

17-
use bongonet_error::Result;
17+
use bongonet_error::{OrErr, Result};
1818
use bytes::Bytes;
19-
use flate2::write::GzEncoder;
19+
use flate2::write::{GzDecoder, GzEncoder};
2020
use std::io::Write;
2121
use std::time::{Duration, Instant};
2222

23-
// TODO: unzip
23+
pub struct Decompressor {
24+
decompress: GzDecoder<Vec<u8>>,
25+
total_in: usize,
26+
total_out: usize,
27+
duration: Duration,
28+
}
29+
30+
impl Decompressor {
31+
pub fn new() -> Self {
32+
Decompressor {
33+
decompress: GzDecoder::new(vec![]),
34+
total_in: 0,
35+
total_out: 0,
36+
duration: Duration::new(0, 0),
37+
}
38+
}
39+
}
40+
41+
impl Encode for Decompressor {
42+
fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
43+
const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024;
44+
const ESTIMATED_COMPRESSION_RATIO: usize = 3; // estimated 2.5-3x compression
45+
let start = Instant::now();
46+
self.total_in += input.len();
47+
// cap the buf size amplification, there is a DoS risk of always allocate
48+
// 3x the memory of the input buffer
49+
let reserve_size = if input.len() < MAX_INIT_COMPRESSED_SIZE_CAP {
50+
input.len() * ESTIMATED_COMPRESSION_RATIO
51+
} else {
52+
input.len()
53+
};
54+
self.decompress.get_mut().reserve(reserve_size);
55+
self.decompress
56+
.write_all(input)
57+
.or_err(COMPRESSION_ERROR, "while decompress Gzip")?;
58+
// write to vec will never fail, only possible error is that the input data
59+
// was not actually gzip compressed
60+
if end {
61+
self.decompress
62+
.try_finish()
63+
.or_err(COMPRESSION_ERROR, "while decompress Gzip")?;
64+
}
65+
self.total_out += self.decompress.get_ref().len();
66+
self.duration += start.elapsed();
67+
Ok(std::mem::take(self.decompress.get_mut()).into()) // into() Bytes will drop excess capacity
68+
}
69+
70+
fn stat(&self) -> (&'static str, usize, usize, Duration) {
71+
("de-gzip", self.total_in, self.total_out, self.duration)
72+
}
73+
}
2474

2575
pub struct Compressor {
2676
// TODO: enum for other compression algorithms
@@ -66,6 +116,20 @@ impl Encode for Compressor {
66116
}
67117

68118
use std::ops::{Deref, DerefMut};
119+
impl Deref for Decompressor {
120+
type Target = GzDecoder<Vec<u8>>;
121+
122+
fn deref(&self) -> &Self::Target {
123+
&self.decompress
124+
}
125+
}
126+
127+
impl DerefMut for Decompressor {
128+
fn deref_mut(&mut self) -> &mut Self::Target {
129+
&mut self.decompress
130+
}
131+
}
132+
69133
impl Deref for Compressor {
70134
type Target = GzEncoder<Vec<u8>>;
71135

@@ -100,4 +164,21 @@ mod tests_stream {
100164

101165
assert!(compressor.get_ref().is_empty());
102166
}
167+
168+
#[test]
169+
fn gunzip_data() {
170+
let mut decompressor = Decompressor::new();
171+
172+
let compressed_bytes = &[
173+
0x1f, 0x8b, 0x08, 0, 0, 0, 0, 0, 0, 255, 75, 76, 74, 78, 73, 77, 75, 7, 0, 166, 106,
174+
42, 49, 7, 0, 0, 0,
175+
];
176+
let decompressed = decompressor.encode(compressed_bytes, true).unwrap();
177+
178+
assert_eq!(&decompressed[..], b"abcdefg");
179+
assert_eq!(decompressor.total_in, compressed_bytes.len());
180+
assert_eq!(decompressor.total_out, decompressed.len());
181+
182+
assert!(decompressor.get_ref().is_empty());
183+
}
103184
}

bongonet-core/src/protocols/http/compression/mod.rs

+28-19
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ pub struct ResponseCompressionCtx(CtxInner);
6767

6868
enum CtxInner {
6969
HeaderPhase {
70-
decompress_enable: bool,
7170
// Store the preferred list to compare with content-encoding
7271
accept_encoding: Vec<Algorithm>,
7372
encoding_levels: [u32; Algorithm::COUNT],
73+
decompress_enable: [bool; Algorithm::COUNT],
7474
},
7575
BodyPhase(Option<Box<dyn Encode + Send + Sync>>),
7676
}
@@ -81,9 +81,9 @@ impl ResponseCompressionCtx {
8181
/// The `decompress_enable` flag will tell the ctx to decompress if needed.
8282
pub fn new(compression_level: u32, decompress_enable: bool) -> Self {
8383
Self(CtxInner::HeaderPhase {
84-
decompress_enable,
8584
accept_encoding: Vec::new(),
8685
encoding_levels: [compression_level; Algorithm::COUNT],
86+
decompress_enable: [decompress_enable; Algorithm::COUNT],
8787
})
8888
}
8989

@@ -93,9 +93,9 @@ impl ResponseCompressionCtx {
9393
match &self.0 {
9494
CtxInner::HeaderPhase {
9595
decompress_enable,
96-
accept_encoding: _,
9796
encoding_levels: levels,
98-
} => levels.iter().any(|l| *l != 0) || *decompress_enable,
97+
..
98+
} => levels.iter().any(|l| *l != 0) || decompress_enable.iter().any(|d| *d),
9999
CtxInner::BodyPhase(c) => c.is_some(),
100100
}
101101
}
@@ -104,11 +104,7 @@ impl ResponseCompressionCtx {
104104
/// algorithm name, in bytes, out bytes, time took for the compression
105105
pub fn get_info(&self) -> Option<(&'static str, usize, usize, Duration)> {
106106
match &self.0 {
107-
CtxInner::HeaderPhase {
108-
decompress_enable: _,
109-
accept_encoding: _,
110-
encoding_levels: _,
111-
} => None,
107+
CtxInner::HeaderPhase { .. } => None,
112108
CtxInner::BodyPhase(c) => c.as_ref().map(|c| c.stat()),
113109
}
114110
}
@@ -119,9 +115,8 @@ impl ResponseCompressionCtx {
119115
pub fn adjust_level(&mut self, new_level: u32) {
120116
match &mut self.0 {
121117
CtxInner::HeaderPhase {
122-
decompress_enable: _,
123-
accept_encoding: _,
124118
encoding_levels: levels,
119+
..
125120
} => {
126121
*levels = [new_level; Algorithm::COUNT];
127122
}
@@ -135,27 +130,38 @@ impl ResponseCompressionCtx {
135130
pub fn adjust_algorithm_level(&mut self, algorithm: Algorithm, new_level: u32) {
136131
match &mut self.0 {
137132
CtxInner::HeaderPhase {
138-
decompress_enable: _,
139-
accept_encoding: _,
140133
encoding_levels: levels,
134+
..
141135
} => {
142136
levels[algorithm.index()] = new_level;
143137
}
144138
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
145139
}
146140
}
147141

148-
/// Adjust the decompression flag.
142+
/// Adjust the decompression flag for all compression algorithms.
149143
/// # Panic
150144
/// This function will panic if it has already started encoding the response body.
151145
pub fn adjust_decompression(&mut self, enabled: bool) {
152146
match &mut self.0 {
153147
CtxInner::HeaderPhase {
154-
decompress_enable,
155-
accept_encoding: _,
156-
encoding_levels: _,
148+
decompress_enable, ..
157149
} => {
158-
*decompress_enable = enabled;
150+
*decompress_enable = [enabled; Algorithm::COUNT];
151+
}
152+
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
153+
}
154+
}
155+
156+
/// Adjust the decompression flag for a specific algorithm.
157+
/// # Panic
158+
/// This function will panic if it has already started encoding the response body.
159+
pub fn adjust_algorithm_decompression(&mut self, algorithm: Algorithm, enabled: bool) {
160+
match &mut self.0 {
161+
CtxInner::HeaderPhase {
162+
decompress_enable, ..
163+
} => {
164+
decompress_enable[algorithm.index()] = enabled;
159165
}
160166
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
161167
}
@@ -208,7 +214,9 @@ impl ResponseCompressionCtx {
208214
let encoder = match action {
209215
Action::Noop => None,
210216
Action::Compress(algorithm) => algorithm.compressor(levels[algorithm.index()]),
211-
Action::Decompress(algorithm) => algorithm.decompressor(*decompress_enable),
217+
Action::Decompress(algorithm) => {
218+
algorithm.decompressor(decompress_enable[algorithm.index()])
219+
}
212220
};
213221
if encoder.is_some() {
214222
adjust_response_header(resp, &action);
@@ -317,6 +325,7 @@ impl Algorithm {
317325
None
318326
} else {
319327
match self {
328+
Self::Gzip => Some(Box::new(gzip::Decompressor::new())),
320329
Self::Brotli => Some(Box::new(brotli::Decompressor::new())),
321330
_ => None, // not implemented
322331
}

bongonet-core/src/protocols/http/v2/client.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,19 @@ impl Http2Session {
349349
if self.ping_timedout() {
350350
e.etype = PING_TIMEDOUT;
351351
}
352+
353+
// is_go_away: retry via another connection, this connection is being teardown
354+
// should retry
355+
if self.response_header.is_none() {
356+
if let Some(err) = e.root_cause().downcast_ref::<h2::Error>() {
357+
if err.is_go_away()
358+
&& err.is_remote()
359+
&& err.reason().map_or(false, |r| r == h2::Reason::NO_ERROR)
360+
{
361+
e.retry = true.into();
362+
}
363+
}
364+
}
352365
e
353366
}
354367
}
@@ -367,7 +380,7 @@ pub fn write_body(send_body: &mut SendStream<Bytes>, data: Bytes, end: bool) ->
367380
/* Types of errors during h2 header read
368381
1. peer requests to downgrade to h1, mostly IIS server for NTLM: we will downgrade and retry
369382
2. peer sends invalid h2 frames, usually sending h1 only header: we will downgrade and retry
370-
3. peer sends GO_AWAY(NO_ERROR) on reused conn, usually hit http2_max_requests: we will retry
383+
3. peer sends GO_AWAY(NO_ERROR) connection is being shut down: we will retry
371384
4. peer IO error on reused conn, usually firewall kills old conn: we will retry
372385
5. All other errors will terminate the request
373386
*/
@@ -395,7 +408,7 @@ fn handle_read_header_error(e: h2::Error) -> Box<Error> {
395408
// is_go_away: retry via another connection, this connection is being teardown
396409
// only retry if the connection is reused
397410
let mut err = Error::because(H2Error, "while reading h2 header", e);
398-
err.retry = RetryType::ReusedOnly;
411+
err.retry = true.into();
399412
err
400413
} else if e.is_io() {
401414
// is_io: typical if a previously reused connection silently drops it

0 commit comments

Comments
 (0)