From c538b06667b930c8a4b73b8e8dc67ec158b51999 Mon Sep 17 00:00:00 2001 From: Tony Redondo Date: Fri, 12 Jan 2024 18:04:58 +0100 Subject: [PATCH 1/6] [CIVisibility] - Add support for GZip compression in Multipart payloads --- .../Agent/IMultipartApiRequest.cs | 3 +- .../Agent/Transports/ApiWebRequest.cs | 28 ++++++++--- .../Agent/Transports/GzipCompressedContent.cs | 50 +++++++++++++++++++ .../Agent/Transports/HttpClientRequest.cs | 14 ++++-- .../Agent/Transports/MultipartCompression.cs | 14 ++++++ .../Ci/Agent/CIWriterHttpSender.cs | 2 +- .../Ci/IntelligentTestRunnerClient.cs | 4 +- 7 files changed, 102 insertions(+), 13 deletions(-) create mode 100644 tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs create mode 100644 tracer/src/Datadog.Trace/Agent/Transports/MultipartCompression.cs diff --git a/tracer/src/Datadog.Trace/Agent/IMultipartApiRequest.cs b/tracer/src/Datadog.Trace/Agent/IMultipartApiRequest.cs index 25a0a7dd6305..6b8b0f500d75 100644 --- a/tracer/src/Datadog.Trace/Agent/IMultipartApiRequest.cs +++ b/tracer/src/Datadog.Trace/Agent/IMultipartApiRequest.cs @@ -6,11 +6,12 @@ #nullable enable using System.Threading.Tasks; +using Datadog.Trace.Agent.Transports; namespace Datadog.Trace.Agent { internal interface IMultipartApiRequest { - Task PostAsync(params MultipartFormItem[] items); + Task PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None); } } diff --git a/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs b/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs index a1e08fb4a25b..a2bec4c74a43 100644 --- a/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs +++ b/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs @@ -5,6 +5,7 @@ using System; using System.IO; +using System.IO.Compression; using System.Net; using System.Text; using System.Threading.Tasks; @@ -74,8 +75,9 @@ public async Task PostAsync(Func writeToRequestStrea /// WARNING: Name and FileName of each MultipartFormItem instance must be ASCII encoding compatible. /// /// Multipart form data items + /// Multipart compression /// Task with the response - public async Task PostAsync(params MultipartFormItem[] items) + public async Task PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None) { if (items is null) { @@ -84,9 +86,25 @@ public async Task PostAsync(params MultipartFormItem[] items) Log.Debug("Sending multipart form request with {Count} items.", items.Length); - ResetRequest(method: "POST", contentType: "multipart/form-data; boundary=" + Boundary, contentEncoding: null); + ResetRequest(method: "POST", contentType: "multipart/form-data; boundary=" + Boundary, contentEncoding: multipartCompression == MultipartCompression.GZip ? "gzip" : null); + using (var reqStream = await _request.GetRequestStreamAsync().ConfigureAwait(false)) + { + if (multipartCompression == MultipartCompression.GZip) + { + Log.Debug("Using MultipartCompression.GZip"); + using var gzip = new GZipStream(reqStream, CompressionMode.Compress, true); + await WriteToStreamAsync(items, gzip).ConfigureAwait(false); + Log.Debug("Compressing multipart payload..."); + } + else + { + await WriteToStreamAsync(items, reqStream).ConfigureAwait(false); + } + } - using (var requestStream = await _request.GetRequestStreamAsync().ConfigureAwait(false)) + return await FinishAndGetResponse().ConfigureAwait(false); + + async Task WriteToStreamAsync(MultipartFormItem[] multipartItems, Stream requestStream) { // Write form request using the boundary var boundaryBytes = _boundarySeparatorInBytes ??= Encoding.ASCII.GetBytes(BoundarySeparator); @@ -94,7 +112,7 @@ public async Task PostAsync(params MultipartFormItem[] items) // Write each MultipartFormItem var itemsWritten = 0; - foreach (var item in items) + foreach (var item in multipartItems) { byte[] headerBytes = null; @@ -159,8 +177,6 @@ public async Task PostAsync(params MultipartFormItem[] items) await requestStream.WriteAsync(trailerBytes, 0, trailerBytes.Length).ConfigureAwait(false); } } - - return await FinishAndGetResponse().ConfigureAwait(false); } private void ResetRequest(string method, string contentType, string contentEncoding) diff --git a/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs b/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs new file mode 100644 index 000000000000..f5268d249bf5 --- /dev/null +++ b/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs @@ -0,0 +1,50 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +#if NETCOREAPP +using System.IO; +using System.IO.Compression; +using System.Net; +using System.Net.Http; +using System.Threading.Tasks; +using Datadog.Trace.Logging; + +namespace Datadog.Trace.Agent.Transports; + +internal class GzipCompressedContent : HttpContent +{ + private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor(); + + private readonly HttpContent _content; + + public GzipCompressedContent(HttpContent content) + { + // Copy original headers + foreach (var header in content.Headers) + { + Headers.TryAddWithoutValidation(header.Key, header.Value); + } + + Headers.ContentEncoding.Add("gzip"); + _content = content; + } + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) + { + Log.Debug("Compressing multipart payload..."); + using var gzip = new GZipStream(stream, CompressionMode.Compress, true); + await _content.CopyToAsync(gzip).ConfigureAwait(false); + } + + protected override bool TryComputeLength(out long length) + { + length = -1; + return false; + } +} + +#endif diff --git a/tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs b/tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs index 1dbd7bcc2d17..2ce5c5132f4e 100644 --- a/tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs +++ b/tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs @@ -123,7 +123,7 @@ public async Task PostAsync(Func writeToRequestStrea return new HttpClientResponse(response); } - public async Task PostAsync(params MultipartFormItem[] items) + public async Task PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None) { if (items is null) { @@ -133,8 +133,6 @@ public async Task PostAsync(params MultipartFormItem[] items) Log.Debug("Sending multipart form request with {Count} items.", items.Length); using var formDataContent = new MultipartFormDataContent(boundary: Boundary); - _postRequest.Content = formDataContent; - foreach (var item in items) { HttpContent content = null; @@ -166,6 +164,16 @@ public async Task PostAsync(params MultipartFormItem[] items) } } + if (multipartCompression == MultipartCompression.GZip) + { + Log.Debug("Using MultipartCompression.GZip"); + _postRequest.Content = new GzipCompressedContent(formDataContent); + } + else + { + _postRequest.Content = formDataContent; + } + var response = await _client.SendAsync(_postRequest).ConfigureAwait(false); return new HttpClientResponse(response); } diff --git a/tracer/src/Datadog.Trace/Agent/Transports/MultipartCompression.cs b/tracer/src/Datadog.Trace/Agent/Transports/MultipartCompression.cs new file mode 100644 index 000000000000..449e1a7a7a34 --- /dev/null +++ b/tracer/src/Datadog.Trace/Agent/Transports/MultipartCompression.cs @@ -0,0 +1,14 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +#nullable enable + +namespace Datadog.Trace.Agent.Transports; + +internal enum MultipartCompression +{ + None, + GZip +} diff --git a/tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs b/tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs index becdba40f9ed..b040a6fb8c86 100644 --- a/tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs +++ b/tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs @@ -255,7 +255,7 @@ await SendPayloadAsync( { if (request is IMultipartApiRequest multipartRequest) { - return multipartRequest.PostAsync(payloadArray); + return multipartRequest.PostAsync(payloadArray, MultipartCompression.GZip); } MultipartApiRequestNotSupported.Throw(); diff --git a/tracer/src/Datadog.Trace/Ci/IntelligentTestRunnerClient.cs b/tracer/src/Datadog.Trace/Ci/IntelligentTestRunnerClient.cs index fdd2b3c5aee9..beb9d7597624 100644 --- a/tracer/src/Datadog.Trace/Ci/IntelligentTestRunnerClient.cs +++ b/tracer/src/Datadog.Trace/Ci/IntelligentTestRunnerClient.cs @@ -583,9 +583,9 @@ async Task InternalSendObjectsPackFileAsync(string packFile, bool finalTry try { - using var response = await multipartRequest.PostAsync( + using var response = await multipartRequest.PostAsync([ new MultipartFormItem("pushedSha", MimeTypes.Json, null, new ArraySegment(jsonPushedShaBytes)), - new MultipartFormItem("packfile", "application/octet-stream", null, fileStream)) + new MultipartFormItem("packfile", "application/octet-stream", null, fileStream)]) .ConfigureAwait(false); var responseContent = await response.ReadAsStringAsync().ConfigureAwait(false); if (TelemetryHelper.GetErrorTypeFromStatusCode(response.StatusCode) is { } errorType) From b1dd9a3edbc3739081e0ecd3974c2645a60b16fd Mon Sep 17 00:00:00 2001 From: Tony Redondo Date: Mon, 15 Jan 2024 10:54:04 +0100 Subject: [PATCH 2/6] Change comment --- .../src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs b/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs index f5268d249bf5..624333d828ee 100644 --- a/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs +++ b/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs @@ -35,7 +35,7 @@ public GzipCompressedContent(HttpContent content) protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) { - Log.Debug("Compressing multipart payload..."); + Log.Debug("GZip compressing payload..."); using var gzip = new GZipStream(stream, CompressionMode.Compress, true); await _content.CopyToAsync(gzip).ConfigureAwait(false); } From 1cdd4ea6496224cf04e31b460f642b6c64d9c4c3 Mon Sep 17 00:00:00 2001 From: Tony Redondo Date: Mon, 15 Jan 2024 11:01:14 +0100 Subject: [PATCH 3/6] Adding await --- .../Datadog.Trace/Agent/Transports/GzipCompressedContent.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs b/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs index 624333d828ee..a5b6c30eb671 100644 --- a/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs +++ b/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs @@ -36,7 +36,9 @@ public GzipCompressedContent(HttpContent content) protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) { Log.Debug("GZip compressing payload..."); - using var gzip = new GZipStream(stream, CompressionMode.Compress, true); +#pragma warning disable CA2007 + await using var gzip = new GZipStream(stream, CompressionMode.Compress, true); +#pragma warning restore CA2007 await _content.CopyToAsync(gzip).ConfigureAwait(false); } From 41bc898d46a3d9ecdffbb8b6569d1c272ff99439 Mon Sep 17 00:00:00 2001 From: Tony Redondo Date: Mon, 15 Jan 2024 11:12:26 +0100 Subject: [PATCH 4/6] Enable if evp proxy is disabled --- .../src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml | 1 + tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml b/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml index 69cf963187c8..91de869e2059 100644 --- a/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml +++ b/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml @@ -717,6 +717,7 @@ + diff --git a/tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs b/tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs index b040a6fb8c86..2609d695ae30 100644 --- a/tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs +++ b/tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs @@ -255,7 +255,7 @@ await SendPayloadAsync( { if (request is IMultipartApiRequest multipartRequest) { - return multipartRequest.PostAsync(payloadArray, MultipartCompression.GZip); + return multipartRequest.PostAsync(payloadArray, payload.UseEvpProxy ? MultipartCompression.None : MultipartCompression.GZip); } MultipartApiRequestNotSupported.Throw(); From 790fc27dd9297aedb21e3091d87043f8e05e23f2 Mon Sep 17 00:00:00 2001 From: Tony Redondo Date: Mon, 15 Jan 2024 11:53:02 +0100 Subject: [PATCH 5/6] Applying changes from review. --- .../Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml | 1 - tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs | 3 ++- .../Datadog.Trace/Agent/Transports/GzipCompressedContent.cs | 5 ++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml b/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml index 91de869e2059..69cf963187c8 100644 --- a/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml +++ b/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml @@ -717,7 +717,6 @@ - diff --git a/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs b/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs index a2bec4c74a43..ead6a39389ba 100644 --- a/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs +++ b/tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs @@ -92,8 +92,9 @@ public async Task PostAsync(MultipartFormItem[] items, MultipartCo if (multipartCompression == MultipartCompression.GZip) { Log.Debug("Using MultipartCompression.GZip"); - using var gzip = new GZipStream(reqStream, CompressionMode.Compress, true); + using var gzip = new GZipStream(reqStream, CompressionMode.Compress, leaveOpen: true); await WriteToStreamAsync(items, gzip).ConfigureAwait(false); + await gzip.FlushAsync().ConfigureAwait(false); Log.Debug("Compressing multipart payload..."); } else diff --git a/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs b/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs index a5b6c30eb671..bb6d237de486 100644 --- a/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs +++ b/tracer/src/Datadog.Trace/Agent/Transports/GzipCompressedContent.cs @@ -36,10 +36,9 @@ public GzipCompressedContent(HttpContent content) protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) { Log.Debug("GZip compressing payload..."); -#pragma warning disable CA2007 - await using var gzip = new GZipStream(stream, CompressionMode.Compress, true); -#pragma warning restore CA2007 + using var gzip = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true); await _content.CopyToAsync(gzip).ConfigureAwait(false); + await gzip.FlushAsync().ConfigureAwait(false); } protected override bool TryComputeLength(out long length) From 255e4c74e5baa11eb8381f20fea32ca1ef471796 Mon Sep 17 00:00:00 2001 From: Tony Redondo Date: Mon, 15 Jan 2024 14:16:09 +0100 Subject: [PATCH 6/6] Add test --- .../Agent/CiVisibilityProtocolWriterTests.cs | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tracer/test/Datadog.Trace.ClrProfiler.IntegrationTests/CI/Agent/CiVisibilityProtocolWriterTests.cs b/tracer/test/Datadog.Trace.ClrProfiler.IntegrationTests/CI/Agent/CiVisibilityProtocolWriterTests.cs index fe360470dd7d..53bc838ee46f 100644 --- a/tracer/test/Datadog.Trace.ClrProfiler.IntegrationTests/CI/Agent/CiVisibilityProtocolWriterTests.cs +++ b/tracer/test/Datadog.Trace.ClrProfiler.IntegrationTests/CI/Agent/CiVisibilityProtocolWriterTests.cs @@ -6,9 +6,15 @@ using System; using System.Collections.Generic; using System.IO; +using System.IO.Compression; using System.Linq; +using System.Net; +using System.Net.Http; +using System.Text; +using System.Threading; using System.Threading.Tasks; using Datadog.Trace.Agent; +using Datadog.Trace.Agent.Transports; using Datadog.Trace.Ci; using Datadog.Trace.Ci.Agent; using Datadog.Trace.Ci.Coverage.Models.Tests; @@ -18,6 +24,7 @@ using Datadog.Trace.Vendors.MessagePack; using Datadog.Trace.Vendors.Newtonsoft.Json; using Datadog.Trace.Vendors.Newtonsoft.Json.Linq; +using FluentAssertions; using Moq; using Xunit; @@ -140,6 +147,29 @@ public async Task AgentlessCodeCoverageEvent() } } +#if NETCOREAPP3_1_OR_GREATER + [Fact] + public async Task AgentlessCodeCoverageCompressedPayloadTest() + { + var requestFactory = new HttpClientRequestFactory(new Uri("http://localhost"), Array.Empty>(), new TestMessageHandler()); + var apiRequest = (IMultipartApiRequest)requestFactory.Create(new Uri("http://localhost/api")); + var response = await apiRequest.PostAsync( + new[] { new MultipartFormItem("TestName", "application/binary", "TestFileName", "TestContent"u8.ToArray()) }, + MultipartCompression.GZip) + .ConfigureAwait(false); + var stream = await response.GetStreamAsync().ConfigureAwait(false); + using var unzippedStream = new GZipStream(stream, CompressionMode.Decompress, leaveOpen: true); + var ms = new MemoryStream(); + await unzippedStream.CopyToAsync(ms).ConfigureAwait(false); + ms.Position = 0; + using var rs = new StreamReader(ms, Encoding.UTF8); + var requestContent = await rs.ReadToEndAsync().ConfigureAwait(false); + requestContent.Should().Contain("Content-Type: application/binary"); + requestContent.Should().Contain("Content-Disposition: form-data; name=TestName; filename=TestFileName; filename*=utf-8''TestFileName"); + requestContent.Should().Contain("TestContent"); + } +#endif + [Fact] public async Task SlowSenderTest() { @@ -325,5 +355,17 @@ public void CoverageBufferTest() bufferSize *= 2; } } + + internal class TestMessageHandler : HttpMessageHandler + { + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + var ms = new MemoryStream(); + await request.Content.CopyToAsync(ms).ConfigureAwait(false); + var responseMessage = new HttpResponseMessage(HttpStatusCode.OK); + responseMessage.Content = new ByteArrayContent(ms.ToArray()); + return responseMessage; + } + } } }