-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: HTTP headers in Error type #404
Changes from 6 commits
97e14d3
e087f4d
8203776
a83a386
758784a
0da64b3
4e3ff5f
af74d7a
5e83bf6
20a74f5
767aeb8
580d1ea
584c614
40eaebb
5858f61
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,10 @@ import ( | |
"fmt" | ||
"io" | ||
"math" | ||
ihttp "net/http" | ||
"net/http/httptest" | ||
"runtime" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"testing" | ||
Comment on lines
8
to
17
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. activated |
||
|
@@ -265,3 +268,88 @@ func TestFlushWithRetries(t *testing.T) { | |
// two remained | ||
assert.Equal(t, 2, len(service.Lines())) | ||
} | ||
|
||
func TestWriteApiErrorHeaders(t *testing.T) { | ||
calls := 0 | ||
var mu sync.Mutex | ||
server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) { | ||
mu.Lock() | ||
defer mu.Unlock() | ||
calls++ | ||
w.Header().Set("X-Test-Val1", "Not All Correct") | ||
w.Header().Set("X-Test-Val2", "Atlas LV-3B") | ||
w.Header().Set("X-Call-Count", strconv.Itoa(calls)) | ||
w.WriteHeader(ihttp.StatusBadRequest) | ||
_, _ = w.Write([]byte(`{ "code": "bad request", "message": "test header" }`)) | ||
})) | ||
defer server.Close() | ||
svc := http.NewService(server.URL, "my-token", http.DefaultOptions()) | ||
writeAPI := NewWriteAPI("my-org", "my-bucket", svc, write.DefaultOptions().SetBatchSize(5)) | ||
defer writeAPI.Close() | ||
errCh := writeAPI.Errors() | ||
var wg sync.WaitGroup | ||
var recErr error | ||
wg.Add(1) | ||
go func() { | ||
for i := 0; i < 3; i++ { | ||
recErr = <-errCh | ||
assert.NotNil(t, recErr, "errCh should not run out of values") | ||
assert.Len(t, recErr.(*http.Error).Header, 6) | ||
assert.NotEqual(t, "", recErr.(*http.Error).Header.Get("Date")) | ||
assert.NotEqual(t, "", recErr.(*http.Error).Header.Get("Content-Length")) | ||
assert.NotEqual(t, "", recErr.(*http.Error).Header.Get("Content-Type")) | ||
assert.Equal(t, strconv.Itoa(i+1), recErr.(*http.Error).Header.Get("X-Call-Count")) | ||
assert.Equal(t, "Not All Correct", recErr.(*http.Error).Header.Get("X-Test-Val1")) | ||
assert.Equal(t, "Atlas LV-3B", recErr.(*http.Error).Header.Get("X-Test-Val2")) | ||
} | ||
wg.Done() | ||
}() | ||
points := test.GenPoints(15) | ||
for i := 0; i < 15; i++ { | ||
writeAPI.WritePoint(points[i]) | ||
} | ||
writeAPI.waitForFlushing() | ||
wg.Wait() | ||
assert.Equal(t, calls, 3) | ||
} | ||
|
||
func TestWriteErrorHeaderToString(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a unit test for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. refactored to error_test.go |
||
header := ihttp.Header{ | ||
"Date": []string{"2024-08-07T12:00:00.009"}, | ||
"Content-Length": []string{"12"}, | ||
"Content-Type": []string{"application/json", "encoding UTF-8"}, | ||
"X-Test-Value1": []string{"SaturnV"}, | ||
"X-Test-Value2": []string{"Apollo11"}, | ||
"Retry-After": []string{"2044"}, | ||
"Trace-Id": []string{"123456789ABCDEF0"}, | ||
} | ||
|
||
err := http.Error{ | ||
StatusCode: ihttp.StatusBadRequest, | ||
Code: "bad request", | ||
Message: "this is just a test", | ||
Err: nil, | ||
RetryAfter: 2044, | ||
Header: header, | ||
} | ||
|
||
fullString := err.HeaderToString([]string{}) | ||
|
||
// write order is not guaranteed | ||
assert.Contains(t, fullString, "Date: 2024-08-07T12:00:00.009") | ||
assert.Contains(t, fullString, "Content-Length: 12") | ||
assert.Contains(t, fullString, "Content-Type: application/json") | ||
assert.Contains(t, fullString, "X-Test-Value1: SaturnV") | ||
assert.Contains(t, fullString, "X-Test-Value2: Apollo11") | ||
assert.Contains(t, fullString, "Retry-After: 2044") | ||
assert.Contains(t, fullString, "Trace-Id: 123456789ABCDEF0") | ||
|
||
filterString := err.HeaderToString([]string{"date", "trace-id", "x-test-value1", "x-test-value2"}) | ||
|
||
// write order will follow filter arguments | ||
assert.Equal(t, filterString, | ||
"Date: 2024-08-07T12:00:00.009\nTrace-Id: 123456789ABCDEF0\nX-Test-Value1: SaturnV\nX-Test-Value2: Apollo11\n", | ||
) | ||
assert.NotContains(t, filterString, "Content-Type: application/json") | ||
assert.NotContains(t, filterString, "Retry-After: 2044") | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -164,6 +164,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { | |||||
if batchToWrite != nil { | ||||||
perror := w.WriteBatch(ctx, batchToWrite) | ||||||
if perror != nil { | ||||||
// fmt.Printf("DEBUG perror type %s\n", reflect.TypeOf(perror)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove commented code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed. |
||||||
if isIgnorableError(perror) { | ||||||
log.Warnf("Write error: %s", perror.Error()) | ||||||
} else { | ||||||
|
@@ -196,9 +197,30 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { | |||||
w.retryAttempts++ | ||||||
log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay) | ||||||
} else { | ||||||
log.Errorf("Write error: %s\n", perror.Error()) | ||||||
logMessage := fmt.Sprintf("Write error: %s", perror.Error()) | ||||||
logHeaders := perror.HeaderToString([]string{ | ||||||
"date", | ||||||
"trace-id", | ||||||
"trace-sampled", | ||||||
"X-Influxdb-Build", | ||||||
"X-Influxdb-Request-ID", | ||||||
"X-Influxdb-Version", | ||||||
}) | ||||||
if len(logHeaders) > 0 { | ||||||
logMessage += fmt.Sprintf("\nSelect Response Headers:\n%s", logHeaders) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to "Selected" |
||||||
} | ||||||
log.Error(logMessage) | ||||||
} | ||||||
return &http2.Error{ | ||||||
StatusCode: int(perror.StatusCode), | ||||||
Code: perror.Code, | ||||||
Message: fmt.Errorf( | ||||||
"write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror, | ||||||
).Error(), | ||||||
Err: perror.Err, | ||||||
RetryAfter: perror.RetryAfter, | ||||||
Header: perror.Header, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't look good from the API perspective to return The solution could be to create a write error that would wrap the previous: package write
type Error struct {
origin error
message string
}
func NewError(err error, message string) *Error {
return &Error{
origin: err,
message: message,
}
}
func (e *Error) Error() string {
return fmt.Sprintf("%s: %s", e.message, e.origin)
}
func (e *Error) Unwrap() error {
return e.origin
} Then this code would be: return write.NewError(perror, fmt.Sprintf("write failed (attempts %d)", batchToWrite.RetryAttempts)) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree. When I wrote it, I was thinking there has to be a more elegant way of implementing this. I'm still fairly new to golang and the suggested solution did not occur to me. Implemented as suggested, and with helper methods for extracting header values. |
||||||
} | ||||||
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror) | ||||||
} | ||||||
} | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -337,10 +337,13 @@ func TestMaxRetryTime(t *testing.T) { | |
b := NewBatch("2\n", exp) | ||
// First batch will be checked against maxRetryTime and it will expire. New batch will fail and it will added to retry queue | ||
err = srv.HandleWrite(ctx, b) | ||
//fmt.Printf("DEBUG err %v\n", err) | ||
//fmt.Printf("DEBUG err %v\n", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove commented code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
require.NotNil(t, err) | ||
// 1st Batch expires and writing 2nd trows error | ||
assert.Equal(t, "write failed (attempts 1): Unexpected status code 429", err.Error()) | ||
assert.Equal(t, 1, srv.retryQueue.list.Len()) | ||
// fmt.Printf("DEBUG Header len: %d\n", len(err.(*http.Error).Header)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove commented code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
|
||
//wait until remaining accumulated retryDelay has passed, because there hasn't been a successful write yet | ||
<-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay)))) | ||
|
@@ -702,3 +705,20 @@ func TestIgnoreErrors(t *testing.T) { | |
err = srv.HandleWrite(ctx, b) | ||
assert.Error(t, err) | ||
} | ||
|
||
func TestHttpErrorHeaders(t *testing.T) { | ||
server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) { | ||
w.Header().Set("X-Test-Val1", "Not All Correct") | ||
w.Header().Set("X-Test-Val2", "Atlas LV-3B") | ||
w.WriteHeader(ihttp.StatusBadRequest) | ||
_, _ = w.Write([]byte(`{ "code": "bad request", "message": "test header" }`)) | ||
})) | ||
defer server.Close() | ||
svc := NewService("my-org", "my-bucket", http.NewService(server.URL, "", http.DefaultOptions()), | ||
write.DefaultOptions()) | ||
err := svc.HandleWrite(context.Background(), NewBatch("1", 20)) | ||
assert.Error(t, err) | ||
assert.Equal(t, "400 Bad Request: write failed (attempts 0): 400 Bad Request: { \"code\": \"bad request\", \"message\": \"test header\" }", err.Error()) | ||
assert.Equal(t, "Not All Correct", err.(*http.Error).Header.Get("X-Test-Val1")) | ||
assert.Equal(t, "Atlas LV-3B", err.(*http.Error).Header.Get("X-Test-Val2")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.