Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CIVisibility] - Add support for GZip compression in Multipart payloads #5060

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tracer/src/Datadog.Trace/Agent/IMultipartApiRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#nullable enable

using System.Threading.Tasks;
using Datadog.Trace.Agent.Transports;

namespace Datadog.Trace.Agent
{
internal interface IMultipartApiRequest
{
Task<IApiResponse> PostAsync(params MultipartFormItem[] items);
Task<IApiResponse> PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None);
}
}
29 changes: 23 additions & 6 deletions tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using System;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -74,8 +75,9 @@ public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStrea
/// WARNING: Name and FileName of each MultipartFormItem instance must be ASCII encoding compatible.
/// </summary>
/// <param name="items">Multipart form data items</param>
/// <param name="multipartCompression">Multipart compression</param>
/// <returns>Task with the response</returns>
public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
public async Task<IApiResponse> PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None)
{
if (items is null)
{
Expand All @@ -84,17 +86,34 @@ public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)

Log.Debug<int>("Sending multipart form request with {Count} items.", items.Length);

ResetRequest(method: "POST", contentType: "multipart/form-data; boundary=" + Boundary, contentEncoding: null);
ResetRequest(method: "POST", contentType: "multipart/form-data; boundary=" + Boundary, contentEncoding: multipartCompression == MultipartCompression.GZip ? "gzip" : null);
using (var reqStream = await _request.GetRequestStreamAsync().ConfigureAwait(false))
{
if (multipartCompression == MultipartCompression.GZip)
{
Log.Debug("Using MultipartCompression.GZip");
using var gzip = new GZipStream(reqStream, CompressionMode.Compress, leaveOpen: true);
await WriteToStreamAsync(items, gzip).ConfigureAwait(false);
await gzip.FlushAsync().ConfigureAwait(false);
Log.Debug("Compressing multipart payload...");
}
else
{
await WriteToStreamAsync(items, reqStream).ConfigureAwait(false);
}
}

using (var requestStream = await _request.GetRequestStreamAsync().ConfigureAwait(false))
return await FinishAndGetResponse().ConfigureAwait(false);

async Task WriteToStreamAsync(MultipartFormItem[] multipartItems, Stream requestStream)
{
// Write form request using the boundary
var boundaryBytes = _boundarySeparatorInBytes ??= Encoding.ASCII.GetBytes(BoundarySeparator);
var trailerBytes = _boundaryTrailerInBytes ??= Encoding.ASCII.GetBytes(BoundaryTrailer);

// Write each MultipartFormItem
var itemsWritten = 0;
foreach (var item in items)
foreach (var item in multipartItems)
{
byte[] headerBytes = null;

Expand Down Expand Up @@ -159,8 +178,6 @@ public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
await requestStream.WriteAsync(trailerBytes, 0, trailerBytes.Length).ConfigureAwait(false);
}
}

return await FinishAndGetResponse().ConfigureAwait(false);
}

private void ResetRequest(string method, string contentType, string contentEncoding)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// <copyright file="GzipCompressedContent.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

#if NETCOREAPP
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Datadog.Trace.Logging;

namespace Datadog.Trace.Agent.Transports;

internal class GzipCompressedContent : HttpContent
{
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<GzipCompressedContent>();

private readonly HttpContent _content;

public GzipCompressedContent(HttpContent content)
{
// Copy original headers
foreach (var header in content.Headers)
{
Headers.TryAddWithoutValidation(header.Key, header.Value);
}

Headers.ContentEncoding.Add("gzip");
_content = content;
}

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)
{
Log.Debug("GZip compressing payload...");
using var gzip = new GZipStream(stream, CompressionMode.Compress, leaveOpen: true);
await _content.CopyToAsync(gzip).ConfigureAwait(false);
await gzip.FlushAsync().ConfigureAwait(false);
}

protected override bool TryComputeLength(out long length)
{
length = -1;
return false;
}
}

#endif
14 changes: 11 additions & 3 deletions tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStrea
return new HttpClientResponse(response);
}

public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
public async Task<IApiResponse> PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None)
{
if (items is null)
{
Expand All @@ -133,8 +133,6 @@ public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
Log.Debug<int>("Sending multipart form request with {Count} items.", items.Length);

using var formDataContent = new MultipartFormDataContent(boundary: Boundary);
_postRequest.Content = formDataContent;

foreach (var item in items)
{
HttpContent content = null;
Expand Down Expand Up @@ -166,6 +164,16 @@ public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
}
}

if (multipartCompression == MultipartCompression.GZip)
{
Log.Debug("Using MultipartCompression.GZip");
_postRequest.Content = new GzipCompressedContent(formDataContent);
}
else
{
_postRequest.Content = formDataContent;
}

var response = await _client.SendAsync(_postRequest).ConfigureAwait(false);
return new HttpClientResponse(response);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// <copyright file="MultipartCompression.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

namespace Datadog.Trace.Agent.Transports;

internal enum MultipartCompression
{
None,
GZip
}
2 changes: 1 addition & 1 deletion tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ await SendPayloadAsync(
{
if (request is IMultipartApiRequest multipartRequest)
{
return multipartRequest.PostAsync(payloadArray);
return multipartRequest.PostAsync(payloadArray, payload.UseEvpProxy ? MultipartCompression.None : MultipartCompression.GZip);
}

MultipartApiRequestNotSupported.Throw();
Expand Down
4 changes: 2 additions & 2 deletions tracer/src/Datadog.Trace/Ci/IntelligentTestRunnerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,9 @@ async Task<long> InternalSendObjectsPackFileAsync(string packFile, bool finalTry

try
{
using var response = await multipartRequest.PostAsync(
using var response = await multipartRequest.PostAsync([
new MultipartFormItem("pushedSha", MimeTypes.Json, null, new ArraySegment<byte>(jsonPushedShaBytes)),
new MultipartFormItem("packfile", "application/octet-stream", null, fileStream))
new MultipartFormItem("packfile", "application/octet-stream", null, fileStream)])
.ConfigureAwait(false);
var responseContent = await response.ReadAsStringAsync().ConfigureAwait(false);
if (TelemetryHelper.GetErrorTypeFromStatusCode(response.StatusCode) is { } errorType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Datadog.Trace.Agent;
using Datadog.Trace.Agent.Transports;
using Datadog.Trace.Ci;
using Datadog.Trace.Ci.Agent;
using Datadog.Trace.Ci.Coverage.Models.Tests;
Expand All @@ -18,6 +24,7 @@
using Datadog.Trace.Vendors.MessagePack;
using Datadog.Trace.Vendors.Newtonsoft.Json;
using Datadog.Trace.Vendors.Newtonsoft.Json.Linq;
using FluentAssertions;
using Moq;
using Xunit;

Expand Down Expand Up @@ -140,6 +147,29 @@ public async Task AgentlessCodeCoverageEvent()
}
}

#if NETCOREAPP3_1_OR_GREATER
[Fact]
public async Task AgentlessCodeCoverageCompressedPayloadTest()
{
var requestFactory = new HttpClientRequestFactory(new Uri("http://localhost"), Array.Empty<KeyValuePair<string, string>>(), new TestMessageHandler());
var apiRequest = (IMultipartApiRequest)requestFactory.Create(new Uri("http://localhost/api"));
var response = await apiRequest.PostAsync(
new[] { new MultipartFormItem("TestName", "application/binary", "TestFileName", "TestContent"u8.ToArray()) },
MultipartCompression.GZip)
.ConfigureAwait(false);
var stream = await response.GetStreamAsync().ConfigureAwait(false);
using var unzippedStream = new GZipStream(stream, CompressionMode.Decompress, leaveOpen: true);
var ms = new MemoryStream();
await unzippedStream.CopyToAsync(ms).ConfigureAwait(false);
ms.Position = 0;
using var rs = new StreamReader(ms, Encoding.UTF8);
var requestContent = await rs.ReadToEndAsync().ConfigureAwait(false);
requestContent.Should().Contain("Content-Type: application/binary");
requestContent.Should().Contain("Content-Disposition: form-data; name=TestName; filename=TestFileName; filename*=utf-8''TestFileName");
requestContent.Should().Contain("TestContent");
}
#endif

[Fact]
public async Task SlowSenderTest()
{
Expand Down Expand Up @@ -325,5 +355,17 @@ public void CoverageBufferTest()
bufferSize *= 2;
}
}

internal class TestMessageHandler : HttpMessageHandler
{
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
var ms = new MemoryStream();
await request.Content.CopyToAsync(ms).ConfigureAwait(false);
var responseMessage = new HttpResponseMessage(HttpStatusCode.OK);
responseMessage.Content = new ByteArrayContent(ms.ToArray());
return responseMessage;
}
}
}
}
Loading