diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f4803a..03feaf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.4.7] - 2024-12-13 + +### Changed + +- Updated HTTP span attributes to comply with updated OpenTelemetry semantic conventions. [#182](https://github.com/microsoft/kiota-http-go/issues/182) + ## [1.4.6] - 2024-12-13 ### Changed diff --git a/compression_handler.go b/compression_handler.go index 61d7ba7..526b158 100644 --- a/compression_handler.go +++ b/compression_handler.go @@ -1,161 +1,161 @@ -package nethttplibrary - -import ( - "bytes" - "compress/gzip" - "io" - "net/http" - "strings" - - abstractions "github.com/microsoft/kiota-abstractions-go" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -// CompressionHandler represents a compression middleware -type CompressionHandler struct { - options CompressionOptions -} - -// CompressionOptions is a configuration object for the CompressionHandler middleware -type CompressionOptions struct { - enableCompression bool -} - -type compression interface { - abstractions.RequestOption - ShouldCompress() bool -} - -var compressKey = abstractions.RequestOptionKey{Key: "CompressionHandler"} - -// NewCompressionHandler creates an instance of a compression middleware -func NewCompressionHandler() *CompressionHandler { - options := NewCompressionOptions(true) - return NewCompressionHandlerWithOptions(options) -} - -// NewCompressionHandlerWithOptions creates an instance of the compression middleware with -// specified configurations. -func NewCompressionHandlerWithOptions(option CompressionOptions) *CompressionHandler { - return &CompressionHandler{options: option} -} - -// NewCompressionOptions creates a configuration object for the CompressionHandler -func NewCompressionOptions(enableCompression bool) CompressionOptions { - return CompressionOptions{enableCompression: enableCompression} -} - -// GetKey returns CompressionOptions unique name in context object -func (o CompressionOptions) GetKey() abstractions.RequestOptionKey { - return compressKey -} - -// ShouldCompress reads compression setting form CompressionOptions -func (o CompressionOptions) ShouldCompress() bool { - return o.enableCompression -} - -// Intercept is invoked by the middleware pipeline to either move the request/response -// to the next middleware in the pipeline -func (c *CompressionHandler) Intercept(pipeline Pipeline, middlewareIndex int, req *http.Request) (*http.Response, error) { - reqOption, ok := req.Context().Value(compressKey).(compression) - if !ok { - reqOption = c.options - } - - obsOptions := GetObservabilityOptionsFromRequest(req) - ctx := req.Context() - var span trace.Span - if obsOptions != nil { - ctx, span = otel.GetTracerProvider().Tracer(obsOptions.GetTracerInstrumentationName()).Start(ctx, "CompressionHandler_Intercept") - span.SetAttributes(attribute.Bool("com.microsoft.kiota.handler.compression.enable", true)) - defer span.End() - req = req.WithContext(ctx) - } - - if !reqOption.ShouldCompress() || contentRangeBytesIsPresent(req.Header) || contentEncodingIsPresent(req.Header) || req.Body == nil { - return pipeline.Next(req, middlewareIndex) - } - if span != nil { - span.SetAttributes(attribute.Bool("http.request_body_compressed", true)) - } - - unCompressedBody, err := io.ReadAll(req.Body) - unCompressedContentLength := req.ContentLength - if err != nil { - if span != nil { - span.RecordError(err) - } - return nil, err - } - - compressedBody, size, err := compressReqBody(unCompressedBody) - if err != nil { - if span != nil { - span.RecordError(err) - } - return nil, err - } - - req.Header.Set("Content-Encoding", "gzip") - req.Body = compressedBody - req.ContentLength = int64(size) - - if span != nil { - span.SetAttributes(attribute.Int64("http.request_content_length", req.ContentLength)) - } - - // Sending request with compressed body - resp, err := pipeline.Next(req, middlewareIndex) - if err != nil { - return nil, err - } - - // If response has status 415 retry request with uncompressed body - if resp.StatusCode == 415 { - delete(req.Header, "Content-Encoding") - req.Body = io.NopCloser(bytes.NewBuffer(unCompressedBody)) - req.ContentLength = unCompressedContentLength - - if span != nil { - span.SetAttributes(attribute.Int64("http.request_content_length", req.ContentLength), - attribute.Int("http.request_content_length", 415)) - } - - return pipeline.Next(req, middlewareIndex) - } - - return resp, nil -} - -func contentRangeBytesIsPresent(header http.Header) bool { - contentRanges, _ := header["Content-Range"] - for _, contentRange := range contentRanges { - if strings.Contains(strings.ToLower(contentRange), "bytes") { - return true - } - } - return false -} - -func contentEncodingIsPresent(header http.Header) bool { - _, ok := header["Content-Encoding"] - return ok -} - -func compressReqBody(reqBody []byte) (io.ReadSeekCloser, int, error) { - var buffer bytes.Buffer - gzipWriter := gzip.NewWriter(&buffer) - if _, err := gzipWriter.Write(reqBody); err != nil { - return nil, 0, err - } - - if err := gzipWriter.Close(); err != nil { - return nil, 0, err - } - - reader := bytes.NewReader(buffer.Bytes()) - return NopCloser(reader), buffer.Len(), nil -} +package nethttplibrary + +import ( + "bytes" + "compress/gzip" + "io" + "net/http" + "strings" + + abstractions "github.com/microsoft/kiota-abstractions-go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// CompressionHandler represents a compression middleware +type CompressionHandler struct { + options CompressionOptions +} + +// CompressionOptions is a configuration object for the CompressionHandler middleware +type CompressionOptions struct { + enableCompression bool +} + +type compression interface { + abstractions.RequestOption + ShouldCompress() bool +} + +var compressKey = abstractions.RequestOptionKey{Key: "CompressionHandler"} + +// NewCompressionHandler creates an instance of a compression middleware +func NewCompressionHandler() *CompressionHandler { + options := NewCompressionOptions(true) + return NewCompressionHandlerWithOptions(options) +} + +// NewCompressionHandlerWithOptions creates an instance of the compression middleware with +// specified configurations. +func NewCompressionHandlerWithOptions(option CompressionOptions) *CompressionHandler { + return &CompressionHandler{options: option} +} + +// NewCompressionOptions creates a configuration object for the CompressionHandler +func NewCompressionOptions(enableCompression bool) CompressionOptions { + return CompressionOptions{enableCompression: enableCompression} +} + +// GetKey returns CompressionOptions unique name in context object +func (o CompressionOptions) GetKey() abstractions.RequestOptionKey { + return compressKey +} + +// ShouldCompress reads compression setting form CompressionOptions +func (o CompressionOptions) ShouldCompress() bool { + return o.enableCompression +} + +// Intercept is invoked by the middleware pipeline to either move the request/response +// to the next middleware in the pipeline +func (c *CompressionHandler) Intercept(pipeline Pipeline, middlewareIndex int, req *http.Request) (*http.Response, error) { + reqOption, ok := req.Context().Value(compressKey).(compression) + if !ok { + reqOption = c.options + } + + obsOptions := GetObservabilityOptionsFromRequest(req) + ctx := req.Context() + var span trace.Span + if obsOptions != nil { + ctx, span = otel.GetTracerProvider().Tracer(obsOptions.GetTracerInstrumentationName()).Start(ctx, "CompressionHandler_Intercept") + span.SetAttributes(attribute.Bool("com.microsoft.kiota.handler.compression.enable", true)) + defer span.End() + req = req.WithContext(ctx) + } + + if !reqOption.ShouldCompress() || contentRangeBytesIsPresent(req.Header) || contentEncodingIsPresent(req.Header) || req.Body == nil { + return pipeline.Next(req, middlewareIndex) + } + if span != nil { + span.SetAttributes(attribute.Bool("http.request_body_compressed", true)) + } + + unCompressedBody, err := io.ReadAll(req.Body) + unCompressedContentLength := req.ContentLength + if err != nil { + if span != nil { + span.RecordError(err) + } + return nil, err + } + + compressedBody, size, err := compressReqBody(unCompressedBody) + if err != nil { + if span != nil { + span.RecordError(err) + } + return nil, err + } + + req.Header.Set("Content-Encoding", "gzip") + req.Body = compressedBody + req.ContentLength = int64(size) + + if span != nil { + span.SetAttributes(httpRequestBodySizeAttribute.Int(int(req.ContentLength))) + } + + // Sending request with compressed body + resp, err := pipeline.Next(req, middlewareIndex) + if err != nil { + return nil, err + } + + // If response has status 415 retry request with uncompressed body + if resp.StatusCode == 415 { + delete(req.Header, "Content-Encoding") + req.Body = io.NopCloser(bytes.NewBuffer(unCompressedBody)) + req.ContentLength = unCompressedContentLength + + if span != nil { + span.SetAttributes(httpRequestBodySizeAttribute.Int(int(req.ContentLength)), + httpResponseStatusCodeAttribute.Int(415)) + } + + return pipeline.Next(req, middlewareIndex) + } + + return resp, nil +} + +func contentRangeBytesIsPresent(header http.Header) bool { + contentRanges, _ := header["Content-Range"] + for _, contentRange := range contentRanges { + if strings.Contains(strings.ToLower(contentRange), "bytes") { + return true + } + } + return false +} + +func contentEncodingIsPresent(header http.Header) bool { + _, ok := header["Content-Encoding"] + return ok +} + +func compressReqBody(reqBody []byte) (io.ReadSeekCloser, int, error) { + var buffer bytes.Buffer + gzipWriter := gzip.NewWriter(&buffer) + if _, err := gzipWriter.Write(reqBody); err != nil { + return nil, 0, err + } + + if err := gzipWriter.Close(); err != nil { + return nil, 0, err + } + + reader := bytes.NewReader(buffer.Bytes()) + return NopCloser(reader), buffer.Len(), nil +} diff --git a/nethttp_request_adapter.go b/nethttp_request_adapter.go index e668c4f..4bf170a 100644 --- a/nethttp_request_adapter.go +++ b/nethttp_request_adapter.go @@ -148,15 +148,15 @@ func (a *NetHttpRequestAdapter) getHttpResponseMessage(ctx context.Context, requ contentLenHeader := response.Header.Get("Content-Length") if contentLenHeader != "" { contentLen, _ := strconv.Atoi(contentLenHeader) - spanForAttributes.SetAttributes(attribute.Int("http.response_content_length", contentLen)) + spanForAttributes.SetAttributes(httpResponseBodySizeAttribute.Int(contentLen)) } contentTypeHeader := response.Header.Get("Content-Type") if contentTypeHeader != "" { - spanForAttributes.SetAttributes(attribute.String("http.response_content_type", contentTypeHeader)) + spanForAttributes.SetAttributes(httpResponseHeaderContentTypeAttribute.String(contentTypeHeader)) } spanForAttributes.SetAttributes( - attribute.Int("http.status_code", response.StatusCode), - attribute.String("http.flavor", response.Proto), + httpResponseStatusCodeAttribute.Int(response.StatusCode), + networkProtocolNameAttribute.String(response.Proto), ) } return a.retryCAEResponseIfRequired(ctx, response, requestInfo, claims, spanForAttributes) @@ -177,7 +177,7 @@ func (a *NetHttpRequestAdapter) retryCAEResponseIfRequired(ctx context.Context, authenticateHeaderVal := response.Header.Get("WWW-Authenticate") if authenticateHeaderVal != "" && reBearer.Match([]byte(authenticateHeaderVal)) { span.AddEvent(AuthenticateChallengedEventKey) - spanForAttributes.SetAttributes(attribute.Int("http.retry_count", 1)) + spanForAttributes.SetAttributes(httpRequestResendCountAttribute.Int(1)) responseClaims := "" parametersRaw := string(reBearer.ReplaceAll([]byte(authenticateHeaderVal), []byte(""))) parameters := strings.Split(parametersRaw, ",") @@ -253,19 +253,19 @@ func (a *NetHttpRequestAdapter) getRequestFromRequestInformation(ctx context.Con if spanForAttributes == nil { spanForAttributes = span } - spanForAttributes.SetAttributes(attribute.String("http.method", requestInfo.Method.String())) + spanForAttributes.SetAttributes(httpRequestMethodAttribute.String(requestInfo.Method.String())) uri, err := requestInfo.GetUri() if err != nil { spanForAttributes.RecordError(err) return nil, err } spanForAttributes.SetAttributes( - attribute.String("http.scheme", uri.Scheme), - attribute.String("http.host", uri.Host), + serverAddressAttribute.String(uri.Scheme), + urlSchemeAttribute.String(uri.Host), ) if a.observabilityOptions.IncludeEUIIAttributes { - spanForAttributes.SetAttributes(attribute.String("http.uri", uri.String())) + spanForAttributes.SetAttributes(urlFullAttribute.String(uri.String())) } request, err := nethttp.NewRequestWithContext(ctx, requestInfo.Method.String(), uri.String(), nil) @@ -290,14 +290,14 @@ func (a *NetHttpRequestAdapter) getRequestFromRequestInformation(ctx context.Con } if request.Header.Get("Content-Type") != "" { spanForAttributes.SetAttributes( - attribute.String("http.request_content_type", request.Header.Get("Content-Type")), + httpRequestHeaderContentTypeAttribute.String(request.Header.Get("Content-Type")), ) } if request.Header.Get("Content-Length") != "" { contentLenVal, _ := strconv.Atoi(request.Header.Get("Content-Length")) request.ContentLength = int64(contentLenVal) spanForAttributes.SetAttributes( - attribute.Int("http.request_content_length", contentLenVal), + httpRequestBodySizeAttribute.Int(contentLenVal), ) } } @@ -313,7 +313,7 @@ func (a *NetHttpRequestAdapter) startTracingSpan(ctx context.Context, requestInf decodedUriTemplate := decodeUriEncodedString(requestInfo.UrlTemplate, []byte{'-', '.', '~', '$'}) telemetryPathValue := queryParametersCleanupRegex.ReplaceAll([]byte(decodedUriTemplate), []byte("")) ctx, span := otel.GetTracerProvider().Tracer(a.observabilityOptions.GetTracerInstrumentationName()).Start(ctx, methodName+" - "+string(telemetryPathValue)) - span.SetAttributes(attribute.String("http.uri_template", decodedUriTemplate)) + span.SetAttributes(urlUriTemplateAttribute.String(decodedUriTemplate)) return ctx, span } diff --git a/redirect_handler.go b/redirect_handler.go index a9c2def..f6f63bb 100644 --- a/redirect_handler.go +++ b/redirect_handler.go @@ -120,7 +120,7 @@ func (middleware RedirectHandler) redirectRequest(ctx context.Context, pipeline if observabilityName != "" { ctx, span := otel.GetTracerProvider().Tracer(observabilityName).Start(ctx, "RedirectHandler_Intercept - redirect "+fmt.Sprint(redirectCount)) span.SetAttributes(attribute.Int("com.microsoft.kiota.handler.redirect.count", redirectCount), - attribute.Int("http.status_code", response.StatusCode), + httpResponseStatusCodeAttribute.Int(response.StatusCode), ) defer span.End() redirectRequest = redirectRequest.WithContext(ctx) diff --git a/retry_handler.go b/retry_handler.go index f366396..6273de7 100644 --- a/retry_handler.go +++ b/retry_handler.go @@ -1,198 +1,197 @@ -package nethttplibrary - -import ( - "context" - "fmt" - "io" - "math" - nethttp "net/http" - "strconv" - "time" - - abs "github.com/microsoft/kiota-abstractions-go" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -// RetryHandler handles transient HTTP responses and retries the request given the retry options -type RetryHandler struct { - // default options to use when evaluating the response - options RetryHandlerOptions -} - -// NewRetryHandler creates a new RetryHandler with default options -func NewRetryHandler() *RetryHandler { - return NewRetryHandlerWithOptions(RetryHandlerOptions{ - ShouldRetry: func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool { - return true - }, - }) -} - -// NewRetryHandlerWithOptions creates a new RetryHandler with the given options -func NewRetryHandlerWithOptions(options RetryHandlerOptions) *RetryHandler { - return &RetryHandler{options: options} -} - -const defaultMaxRetries = 3 -const absoluteMaxRetries = 10 -const defaultDelaySeconds = 3 -const absoluteMaxDelaySeconds = 180 - -// RetryHandlerOptions to apply when evaluating the response for retrial -type RetryHandlerOptions struct { - // Callback to determine if the request should be retried - ShouldRetry func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool - // The maximum number of times a request can be retried - MaxRetries int - // The delay in seconds between retries - DelaySeconds int -} - -type retryHandlerOptionsInt interface { - abs.RequestOption - GetShouldRetry() func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool - GetDelaySeconds() int - GetMaxRetries() int -} - -var retryKeyValue = abs.RequestOptionKey{ - Key: "RetryHandler", -} - -// GetKey returns the key value to be used when the option is added to the request context -func (options *RetryHandlerOptions) GetKey() abs.RequestOptionKey { - return retryKeyValue -} - -// GetShouldRetry returns the should retry callback function which evaluates the response for retrial -func (options *RetryHandlerOptions) GetShouldRetry() func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool { - return options.ShouldRetry -} - -// GetDelaySeconds returns the delays in seconds between retries -func (options *RetryHandlerOptions) GetDelaySeconds() int { - if options.DelaySeconds < 1 { - return defaultDelaySeconds - } else if options.DelaySeconds > absoluteMaxDelaySeconds { - return absoluteMaxDelaySeconds - } else { - return options.DelaySeconds - } -} - -// GetMaxRetries returns the maximum number of times a request can be retried -func (options *RetryHandlerOptions) GetMaxRetries() int { - if options.MaxRetries < 1 { - return defaultMaxRetries - } else if options.MaxRetries > absoluteMaxRetries { - return absoluteMaxRetries - } else { - return options.MaxRetries - } -} - -const retryAttemptHeader = "Retry-Attempt" -const retryAfterHeader = "Retry-After" - -const tooManyRequests = 429 -const serviceUnavailable = 503 -const gatewayTimeout = 504 - -// Intercept implements the interface and evaluates whether to retry a failed request. -func (middleware RetryHandler) Intercept(pipeline Pipeline, middlewareIndex int, req *nethttp.Request) (*nethttp.Response, error) { - obsOptions := GetObservabilityOptionsFromRequest(req) - ctx := req.Context() - var span trace.Span - var observabilityName string - if obsOptions != nil { - observabilityName = obsOptions.GetTracerInstrumentationName() - ctx, span = otel.GetTracerProvider().Tracer(observabilityName).Start(ctx, "RetryHandler_Intercept") - span.SetAttributes(attribute.Bool("com.microsoft.kiota.handler.retry.enable", true)) - defer span.End() - req = req.WithContext(ctx) - } - response, err := pipeline.Next(req, middlewareIndex) - if err != nil { - return response, err - } - reqOption, ok := req.Context().Value(retryKeyValue).(retryHandlerOptionsInt) - if !ok { - reqOption = &middleware.options - } - return middleware.retryRequest(ctx, pipeline, middlewareIndex, reqOption, req, response, 0, 0, observabilityName) -} - -func (middleware RetryHandler) retryRequest(ctx context.Context, pipeline Pipeline, middlewareIndex int, options retryHandlerOptionsInt, req *nethttp.Request, resp *nethttp.Response, executionCount int, cumulativeDelay time.Duration, observabilityName string) (*nethttp.Response, error) { - if middleware.isRetriableErrorCode(resp.StatusCode) && - middleware.isRetriableRequest(req) && - executionCount < options.GetMaxRetries() && - cumulativeDelay < time.Duration(absoluteMaxDelaySeconds)*time.Second && - options.GetShouldRetry()(cumulativeDelay, executionCount, req, resp) { - executionCount++ - delay := middleware.getRetryDelay(req, resp, options, executionCount) - cumulativeDelay += delay - req.Header.Set(retryAttemptHeader, strconv.Itoa(executionCount)) - if req.Body != nil { - s, ok := req.Body.(io.Seeker) - if ok { - s.Seek(0, io.SeekStart) - } - } - if observabilityName != "" { - ctx, span := otel.GetTracerProvider().Tracer(observabilityName).Start(ctx, "RetryHandler_Intercept - attempt "+fmt.Sprint(executionCount)) - span.SetAttributes(attribute.Int("http.request.resend_count", executionCount), - - attribute.Int("http.status_code", resp.StatusCode), - attribute.Float64("http.request.resend_delay", delay.Seconds()), - - ) - defer span.End() - req = req.WithContext(ctx) - } - t := time.NewTimer(delay) - select { - case <-ctx.Done(): - // Return without retrying if the context was cancelled. - return nil, ctx.Err() - - // Leaving this case empty causes it to exit the switch-block. - case <-t.C: - } - response, err := pipeline.Next(req, middlewareIndex) - if err != nil { - return response, err - } - return middleware.retryRequest(ctx, pipeline, middlewareIndex, options, req, response, executionCount, cumulativeDelay, observabilityName) - } - return resp, nil -} - -func (middleware RetryHandler) isRetriableErrorCode(code int) bool { - return code == tooManyRequests || code == serviceUnavailable || code == gatewayTimeout -} -func (middleware RetryHandler) isRetriableRequest(req *nethttp.Request) bool { - isBodiedMethod := req.Method == "POST" || req.Method == "PUT" || req.Method == "PATCH" - if isBodiedMethod && req.Body != nil { - return req.ContentLength != -1 - } - return true -} - -func (middleware RetryHandler) getRetryDelay(req *nethttp.Request, resp *nethttp.Response, options retryHandlerOptionsInt, executionCount int) time.Duration { - retryAfter := resp.Header.Get(retryAfterHeader) - if retryAfter != "" { - retryAfterDelay, err := strconv.ParseFloat(retryAfter, 64) - if err == nil { - return time.Duration(retryAfterDelay) * time.Second - } - - // parse the header if it's a date - t, err := time.Parse(time.RFC1123, retryAfter) - if err == nil { - return t.Sub(time.Now()) - } - } - return time.Duration(math.Pow(float64(options.GetDelaySeconds()), float64(executionCount))) * time.Second -} +package nethttplibrary + +import ( + "context" + "fmt" + "io" + "math" + nethttp "net/http" + "strconv" + "time" + + abs "github.com/microsoft/kiota-abstractions-go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// RetryHandler handles transient HTTP responses and retries the request given the retry options +type RetryHandler struct { + // default options to use when evaluating the response + options RetryHandlerOptions +} + +// NewRetryHandler creates a new RetryHandler with default options +func NewRetryHandler() *RetryHandler { + return NewRetryHandlerWithOptions(RetryHandlerOptions{ + ShouldRetry: func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool { + return true + }, + }) +} + +// NewRetryHandlerWithOptions creates a new RetryHandler with the given options +func NewRetryHandlerWithOptions(options RetryHandlerOptions) *RetryHandler { + return &RetryHandler{options: options} +} + +const defaultMaxRetries = 3 +const absoluteMaxRetries = 10 +const defaultDelaySeconds = 3 +const absoluteMaxDelaySeconds = 180 + +// RetryHandlerOptions to apply when evaluating the response for retrial +type RetryHandlerOptions struct { + // Callback to determine if the request should be retried + ShouldRetry func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool + // The maximum number of times a request can be retried + MaxRetries int + // The delay in seconds between retries + DelaySeconds int +} + +type retryHandlerOptionsInt interface { + abs.RequestOption + GetShouldRetry() func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool + GetDelaySeconds() int + GetMaxRetries() int +} + +var retryKeyValue = abs.RequestOptionKey{ + Key: "RetryHandler", +} + +// GetKey returns the key value to be used when the option is added to the request context +func (options *RetryHandlerOptions) GetKey() abs.RequestOptionKey { + return retryKeyValue +} + +// GetShouldRetry returns the should retry callback function which evaluates the response for retrial +func (options *RetryHandlerOptions) GetShouldRetry() func(delay time.Duration, executionCount int, request *nethttp.Request, response *nethttp.Response) bool { + return options.ShouldRetry +} + +// GetDelaySeconds returns the delays in seconds between retries +func (options *RetryHandlerOptions) GetDelaySeconds() int { + if options.DelaySeconds < 1 { + return defaultDelaySeconds + } else if options.DelaySeconds > absoluteMaxDelaySeconds { + return absoluteMaxDelaySeconds + } else { + return options.DelaySeconds + } +} + +// GetMaxRetries returns the maximum number of times a request can be retried +func (options *RetryHandlerOptions) GetMaxRetries() int { + if options.MaxRetries < 1 { + return defaultMaxRetries + } else if options.MaxRetries > absoluteMaxRetries { + return absoluteMaxRetries + } else { + return options.MaxRetries + } +} + +const retryAttemptHeader = "Retry-Attempt" +const retryAfterHeader = "Retry-After" + +const tooManyRequests = 429 +const serviceUnavailable = 503 +const gatewayTimeout = 504 + +// Intercept implements the interface and evaluates whether to retry a failed request. +func (middleware RetryHandler) Intercept(pipeline Pipeline, middlewareIndex int, req *nethttp.Request) (*nethttp.Response, error) { + obsOptions := GetObservabilityOptionsFromRequest(req) + ctx := req.Context() + var span trace.Span + var observabilityName string + if obsOptions != nil { + observabilityName = obsOptions.GetTracerInstrumentationName() + ctx, span = otel.GetTracerProvider().Tracer(observabilityName).Start(ctx, "RetryHandler_Intercept") + span.SetAttributes(attribute.Bool("com.microsoft.kiota.handler.retry.enable", true)) + defer span.End() + req = req.WithContext(ctx) + } + response, err := pipeline.Next(req, middlewareIndex) + if err != nil { + return response, err + } + reqOption, ok := req.Context().Value(retryKeyValue).(retryHandlerOptionsInt) + if !ok { + reqOption = &middleware.options + } + return middleware.retryRequest(ctx, pipeline, middlewareIndex, reqOption, req, response, 0, 0, observabilityName) +} + +func (middleware RetryHandler) retryRequest(ctx context.Context, pipeline Pipeline, middlewareIndex int, options retryHandlerOptionsInt, req *nethttp.Request, resp *nethttp.Response, executionCount int, cumulativeDelay time.Duration, observabilityName string) (*nethttp.Response, error) { + if middleware.isRetriableErrorCode(resp.StatusCode) && + middleware.isRetriableRequest(req) && + executionCount < options.GetMaxRetries() && + cumulativeDelay < time.Duration(absoluteMaxDelaySeconds)*time.Second && + options.GetShouldRetry()(cumulativeDelay, executionCount, req, resp) { + executionCount++ + delay := middleware.getRetryDelay(req, resp, options, executionCount) + cumulativeDelay += delay + req.Header.Set(retryAttemptHeader, strconv.Itoa(executionCount)) + if req.Body != nil { + s, ok := req.Body.(io.Seeker) + if ok { + s.Seek(0, io.SeekStart) + } + } + if observabilityName != "" { + ctx, span := otel.GetTracerProvider().Tracer(observabilityName).Start(ctx, "RetryHandler_Intercept - attempt "+fmt.Sprint(executionCount)) + span.SetAttributes(attribute.Int("http.request.resend_count", executionCount), + + httpResponseStatusCodeAttribute.Int(resp.StatusCode), + attribute.Float64("http.request.resend_delay", delay.Seconds()), + ) + defer span.End() + req = req.WithContext(ctx) + } + t := time.NewTimer(delay) + select { + case <-ctx.Done(): + // Return without retrying if the context was cancelled. + return nil, ctx.Err() + + // Leaving this case empty causes it to exit the switch-block. + case <-t.C: + } + response, err := pipeline.Next(req, middlewareIndex) + if err != nil { + return response, err + } + return middleware.retryRequest(ctx, pipeline, middlewareIndex, options, req, response, executionCount, cumulativeDelay, observabilityName) + } + return resp, nil +} + +func (middleware RetryHandler) isRetriableErrorCode(code int) bool { + return code == tooManyRequests || code == serviceUnavailable || code == gatewayTimeout +} +func (middleware RetryHandler) isRetriableRequest(req *nethttp.Request) bool { + isBodiedMethod := req.Method == "POST" || req.Method == "PUT" || req.Method == "PATCH" + if isBodiedMethod && req.Body != nil { + return req.ContentLength != -1 + } + return true +} + +func (middleware RetryHandler) getRetryDelay(req *nethttp.Request, resp *nethttp.Response, options retryHandlerOptionsInt, executionCount int) time.Duration { + retryAfter := resp.Header.Get(retryAfterHeader) + if retryAfter != "" { + retryAfterDelay, err := strconv.ParseFloat(retryAfter, 64) + if err == nil { + return time.Duration(retryAfterDelay) * time.Second + } + + // parse the header if it's a date + t, err := time.Parse(time.RFC1123, retryAfter) + if err == nil { + return t.Sub(time.Now()) + } + } + return time.Duration(math.Pow(float64(options.GetDelaySeconds()), float64(executionCount))) * time.Second +} diff --git a/span_attributes.go b/span_attributes.go new file mode 100644 index 0000000..c91f341 --- /dev/null +++ b/span_attributes.go @@ -0,0 +1,35 @@ +package nethttplibrary + +import "go.opentelemetry.io/otel/attribute" + +// HTTP Request attributes +const ( + httpRequestBodySizeAttribute = attribute.Key("http.request.body.size") + httpRequestResendCountAttribute = attribute.Key("http.request.resend_count") + httpRequestMethodAttribute = attribute.Key("http.request.method") + httpRequestHeaderContentTypeAttribute = attribute.Key("http.request.header.content-type") +) + +// HTTP Response attributes +const ( + httpResponseBodySizeAttribute = attribute.Key("http.response.body.size") + httpResponseHeaderContentTypeAttribute = attribute.Key("http.response.header.content-type") + httpResponseStatusCodeAttribute = attribute.Key("http.response.status_code") +) + +// Network attributes +const ( + networkProtocolNameAttribute = attribute.Key("network.protocol.name") +) + +// Server attributes +const ( + serverAddressAttribute = attribute.Key("server.address") +) + +// URL attributes +const ( + urlFullAttribute = attribute.Key("url.full") + urlSchemeAttribute = attribute.Key("url.scheme") + urlUriTemplateAttribute = attribute.Key("url.uri_template") +) diff --git a/user_agent_handler.go b/user_agent_handler.go index 51af61b..620ab2d 100644 --- a/user_agent_handler.go +++ b/user_agent_handler.go @@ -42,7 +42,7 @@ func NewUserAgentHandlerOptions() *UserAgentHandlerOptions { return &UserAgentHandlerOptions{ Enabled: true, ProductName: "kiota-go", - ProductVersion: "1.4.6", + ProductVersion: "1.4.7", } }