Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: HTTP headers in Error type #404

Merged
merged 15 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 2.14 [unreleased]

### Features

- [#404](https://github.com/influxdata/influxdb-client-go/pull/404) Expose HTTP response headers in the Error type to aid analysis and debugging of error results. Add selected response headers to the error log.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- [#404](https://github.com/influxdata/influxdb-client-go/pull/404) Expose HTTP response headers in the Error type to aid analysis and debugging of error results. Add selected response headers to the error log.
- [#404](https://github.com/influxdata/influxdb-client-go/pull/404) Expose HTTP response headers in the Error type to aid analysis and debugging of error results. Add selected response headers to the error log.
Also, unified errors returned by WriteAPI, which now always returns `http.Error`


## 2.13.0 [2023-12-05]

### Features
Expand Down
2 changes: 2 additions & 0 deletions api/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/influxdata/influxdb-client-go/v2/api"
apiHttp "github.com/influxdata/influxdb-client-go/v2/api/http"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/influxdata/influxdb-client-go/v2/domain"
influxdb2 "github.com/influxdata/influxdb-client-go/v2/internal/examples"
Expand Down Expand Up @@ -123,6 +124,7 @@ func ExampleWriteAPI_errors() {
go func() {
for err := range errorsCh {
fmt.Printf("write error: %s\n", err.Error())
fmt.Printf("trace-id: %s\n", err.(*apiHttp.Error).Header.Get("Trace-ID"))
}
}()
// write some points
Expand Down
24 changes: 24 additions & 0 deletions api/http/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package http

import (
"fmt"
"net/http"
"strconv"
)

Expand All @@ -16,6 +17,7 @@ type Error struct {
Message string
Err error
RetryAfter uint
Header http.Header
}

// Error fulfils error interface
Expand All @@ -25,6 +27,8 @@ func (e *Error) Error() string {
return e.Err.Error()
case e.Code != "" && e.Message != "":
return fmt.Sprintf("%s: %s", e.Code, e.Message)
case e.Message != "":
return e.Message
default:
return "Unexpected status code " + strconv.Itoa(e.StatusCode)
}
Expand All @@ -37,6 +41,25 @@ func (e *Error) Unwrap() error {
return nil
}

// HeaderToString generates a string value from the Header property. Useful in logging.
func (e *Error) HeaderToString(selected []string) string {
headerString := ""
if len(selected) == 0 {
for key := range e.Header {
k := http.CanonicalHeaderKey(key)
headerString += fmt.Sprintf("%s: %s\r\n", k, e.Header.Get(k))
}
} else {
for _, candidate := range selected {
c := http.CanonicalHeaderKey(candidate)
if e.Header.Get(c) != "" {
headerString += fmt.Sprintf("%s: %s\n", c, e.Header.Get(c))
}
}
}
return headerString
}

// NewError returns newly created Error initialised with nested error and default values
func NewError(err error) *Error {
return &Error{
Expand All @@ -45,5 +68,6 @@ func NewError(err error) *Error {
Message: "",
Err: err,
RetryAfter: 0,
Header: http.Header{},
}
}
1 change: 1 addition & 0 deletions api/http/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (s *service) parseHTTPError(r *http.Response) *Error {

perror := NewError(nil)
perror.StatusCode = r.StatusCode
perror.Header = r.Header

if v := r.Header.Get("Retry-After"); v != "" {
r, err := strconv.ParseUint(v, 10, 32)
Expand Down
88 changes: 88 additions & 0 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"fmt"
"io"
"math"
ihttp "net/http"
"net/http/httptest"
"runtime"
"strconv"
"strings"
"sync"
"testing"
Comment on lines 8 to 17
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use go imports

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

activated goimports in IDE. Updated.

Expand Down Expand Up @@ -265,3 +268,88 @@ func TestFlushWithRetries(t *testing.T) {
// two remained
assert.Equal(t, 2, len(service.Lines()))
}

func TestWriteApiErrorHeaders(t *testing.T) {
calls := 0
var mu sync.Mutex
server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) {
mu.Lock()
defer mu.Unlock()
calls++
w.Header().Set("X-Test-Val1", "Not All Correct")
w.Header().Set("X-Test-Val2", "Atlas LV-3B")
w.Header().Set("X-Call-Count", strconv.Itoa(calls))
w.WriteHeader(ihttp.StatusBadRequest)
_, _ = w.Write([]byte(`{ "code": "bad request", "message": "test header" }`))
}))
defer server.Close()
svc := http.NewService(server.URL, "my-token", http.DefaultOptions())
writeAPI := NewWriteAPI("my-org", "my-bucket", svc, write.DefaultOptions().SetBatchSize(5))
defer writeAPI.Close()
errCh := writeAPI.Errors()
var wg sync.WaitGroup
var recErr error
wg.Add(1)
go func() {
for i := 0; i < 3; i++ {
recErr = <-errCh
assert.NotNil(t, recErr, "errCh should not run out of values")
assert.Len(t, recErr.(*http.Error).Header, 6)
assert.NotEqual(t, "", recErr.(*http.Error).Header.Get("Date"))
assert.NotEqual(t, "", recErr.(*http.Error).Header.Get("Content-Length"))
assert.NotEqual(t, "", recErr.(*http.Error).Header.Get("Content-Type"))
assert.Equal(t, strconv.Itoa(i+1), recErr.(*http.Error).Header.Get("X-Call-Count"))
assert.Equal(t, "Not All Correct", recErr.(*http.Error).Header.Get("X-Test-Val1"))
assert.Equal(t, "Atlas LV-3B", recErr.(*http.Error).Header.Get("X-Test-Val2"))
}
wg.Done()
}()
points := test.GenPoints(15)
for i := 0; i < 15; i++ {
writeAPI.WritePoint(points[i])
}
writeAPI.waitForFlushing()
wg.Wait()
assert.Equal(t, calls, 3)
}

func TestWriteErrorHeaderToString(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a unit test for http.Error. Move this to error_test.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored to error_test.go

header := ihttp.Header{
"Date": []string{"2024-08-07T12:00:00.009"},
"Content-Length": []string{"12"},
"Content-Type": []string{"application/json", "encoding UTF-8"},
"X-Test-Value1": []string{"SaturnV"},
"X-Test-Value2": []string{"Apollo11"},
"Retry-After": []string{"2044"},
"Trace-Id": []string{"123456789ABCDEF0"},
}

err := http.Error{
StatusCode: ihttp.StatusBadRequest,
Code: "bad request",
Message: "this is just a test",
Err: nil,
RetryAfter: 2044,
Header: header,
}

fullString := err.HeaderToString([]string{})

// write order is not guaranteed
assert.Contains(t, fullString, "Date: 2024-08-07T12:00:00.009")
assert.Contains(t, fullString, "Content-Length: 12")
assert.Contains(t, fullString, "Content-Type: application/json")
assert.Contains(t, fullString, "X-Test-Value1: SaturnV")
assert.Contains(t, fullString, "X-Test-Value2: Apollo11")
assert.Contains(t, fullString, "Retry-After: 2044")
assert.Contains(t, fullString, "Trace-Id: 123456789ABCDEF0")

filterString := err.HeaderToString([]string{"date", "trace-id", "x-test-value1", "x-test-value2"})

// write order will follow filter arguments
assert.Equal(t, filterString,
"Date: 2024-08-07T12:00:00.009\nTrace-Id: 123456789ABCDEF0\nX-Test-Value1: SaturnV\nX-Test-Value2: Apollo11\n",
)
assert.NotContains(t, filterString, "Content-Type: application/json")
assert.NotContains(t, filterString, "Retry-After: 2044")
}
14 changes: 14 additions & 0 deletions client_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/http"
"github.com/influxdata/influxdb-client-go/v2/domain"
"github.com/influxdata/influxdb-client-go/v2/internal/test"
"github.com/influxdata/influxdb-client-go/v2/log"
Expand Down Expand Up @@ -368,3 +369,16 @@ func TestWriteCustomBatch(t *testing.T) {
}
assert.Equal(t, 10, l)
}

func TestHttpHeadersInError(t *testing.T) {
client := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(0))
err := client.WriteAPIBlocking("my-org", "my-bucket").WriteRecord(context.Background(), "asdf")
assert.Error(t, err)
assert.Len(t, err.(*http.Error).Header, 6)
assert.NotEqual(t, err.(*http.Error).Header.Get("Date"), "")
assert.NotEqual(t, err.(*http.Error).Header.Get("Content-Length"), "")
assert.NotEqual(t, err.(*http.Error).Header.Get("Content-Type"), "")
assert.NotEqual(t, err.(*http.Error).Header.Get("X-Platform-Error-Code"), "")
assert.Contains(t, err.(*http.Error).Header.Get("X-Influxdb-Version"), "v")
assert.Equal(t, err.(*http.Error).Header.Get("X-Influxdb-Build"), "OSS")
}
26 changes: 24 additions & 2 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
if batchToWrite != nil {
perror := w.WriteBatch(ctx, batchToWrite)
if perror != nil {
// fmt.Printf("DEBUG perror type %s\n", reflect.TypeOf(perror))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

if isIgnorableError(perror) {
log.Warnf("Write error: %s", perror.Error())
} else {
Expand Down Expand Up @@ -196,9 +197,30 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
w.retryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay)
} else {
log.Errorf("Write error: %s\n", perror.Error())
logMessage := fmt.Sprintf("Write error: %s", perror.Error())
logHeaders := perror.HeaderToString([]string{
"date",
"trace-id",
"trace-sampled",
"X-Influxdb-Build",
"X-Influxdb-Request-ID",
"X-Influxdb-Version",
})
if len(logHeaders) > 0 {
logMessage += fmt.Sprintf("\nSelect Response Headers:\n%s", logHeaders)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logMessage += fmt.Sprintf("\nSelect Response Headers:\n%s", logHeaders)
logMessage += fmt.Sprintf("\nSelected Response Headers:\n%s", logHeaders)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to "Selected"

}
log.Error(logMessage)
}
return &http2.Error{
StatusCode: int(perror.StatusCode),
Code: perror.Code,
Message: fmt.Errorf(
"write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror,
).Error(),
Err: perror.Err,
RetryAfter: perror.RetryAfter,
Header: perror.Header,
Copy link
Contributor

@vlastahajek vlastahajek Aug 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look good from the API perspective to return http.Error from the write service. And the more serious problem is that the final message looks silly: "400 Bad Request: write failed (attempts 0): 400 Bad Request: { \"code\": \"bad request\", \"message\": \"test header\".

The solution could be to create a write error that would wrap the previous:

package write

type Error struct {
	origin  error
	message string
}

func NewError(err error, message string) *Error {
	return &Error{
		origin:  err,
		message: message,
	}
}

func (e *Error) Error() string {
	return fmt.Sprintf("%s: %s", e.message, e.origin)
}

func (e *Error) Unwrap() error {
	return e.origin
}

Then this code would be:

return write.NewError(perror, fmt.Sprintf("write failed (attempts %d)", batchToWrite.RetryAttempts))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. When I wrote it, I was thinking there has to be a more elegant way of implementing this. I'm still fairly new to golang and the suggested solution did not occur to me. Implemented as suggested, and with helper methods for extracting header values.

}
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
}
}

Expand Down
20 changes: 20 additions & 0 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,13 @@ func TestMaxRetryTime(t *testing.T) {
b := NewBatch("2\n", exp)
// First batch will be checked against maxRetryTime and it will expire. New batch will fail and it will added to retry queue
err = srv.HandleWrite(ctx, b)
//fmt.Printf("DEBUG err %v\n", err)
//fmt.Printf("DEBUG err %v\n", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

require.NotNil(t, err)
// 1st Batch expires and writing 2nd trows error
assert.Equal(t, "write failed (attempts 1): Unexpected status code 429", err.Error())
assert.Equal(t, 1, srv.retryQueue.list.Len())
// fmt.Printf("DEBUG Header len: %d\n", len(err.(*http.Error).Header))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


//wait until remaining accumulated retryDelay has passed, because there hasn't been a successful write yet
<-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay))))
Expand Down Expand Up @@ -702,3 +705,20 @@ func TestIgnoreErrors(t *testing.T) {
err = srv.HandleWrite(ctx, b)
assert.Error(t, err)
}

func TestHttpErrorHeaders(t *testing.T) {
server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) {
w.Header().Set("X-Test-Val1", "Not All Correct")
w.Header().Set("X-Test-Val2", "Atlas LV-3B")
w.WriteHeader(ihttp.StatusBadRequest)
_, _ = w.Write([]byte(`{ "code": "bad request", "message": "test header" }`))
}))
defer server.Close()
svc := NewService("my-org", "my-bucket", http.NewService(server.URL, "", http.DefaultOptions()),
write.DefaultOptions())
err := svc.HandleWrite(context.Background(), NewBatch("1", 20))
assert.Error(t, err)
assert.Equal(t, "400 Bad Request: write failed (attempts 0): 400 Bad Request: { \"code\": \"bad request\", \"message\": \"test header\" }", err.Error())
assert.Equal(t, "Not All Correct", err.(*http.Error).Header.Get("X-Test-Val1"))
assert.Equal(t, "Atlas LV-3B", err.(*http.Error).Header.Get("X-Test-Val2"))
}