Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CIVisibility] - Add support for GZip compression in Multipart payloads #5060

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tracer/src/Datadog.Trace/Agent/IMultipartApiRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#nullable enable

using System.Threading.Tasks;
using Datadog.Trace.Agent.Transports;

namespace Datadog.Trace.Agent
{
internal interface IMultipartApiRequest
{
Task<IApiResponse> PostAsync(params MultipartFormItem[] items);
Task<IApiResponse> PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None);
}
}
28 changes: 22 additions & 6 deletions tracer/src/Datadog.Trace/Agent/Transports/ApiWebRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

using System;
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -74,8 +75,9 @@ public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStrea
/// WARNING: Name and FileName of each MultipartFormItem instance must be ASCII encoding compatible.
/// </summary>
/// <param name="items">Multipart form data items</param>
/// <param name="multipartCompression">Multipart compression</param>
/// <returns>Task with the response</returns>
public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
public async Task<IApiResponse> PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None)
{
if (items is null)
{
Expand All @@ -84,17 +86,33 @@ public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)

Log.Debug<int>("Sending multipart form request with {Count} items.", items.Length);

ResetRequest(method: "POST", contentType: "multipart/form-data; boundary=" + Boundary, contentEncoding: null);
ResetRequest(method: "POST", contentType: "multipart/form-data; boundary=" + Boundary, contentEncoding: multipartCompression == MultipartCompression.GZip ? "gzip" : null);
using (var reqStream = await _request.GetRequestStreamAsync().ConfigureAwait(false))
{
if (multipartCompression == MultipartCompression.GZip)
{
Log.Debug("Using MultipartCompression.GZip");
using var gzip = new GZipStream(reqStream, CompressionMode.Compress, true);
tonyredondo marked this conversation as resolved.
Show resolved Hide resolved
await WriteToStreamAsync(items, gzip).ConfigureAwait(false);
Log.Debug("Compressing multipart payload...");
}
else
{
await WriteToStreamAsync(items, reqStream).ConfigureAwait(false);
}
}

using (var requestStream = await _request.GetRequestStreamAsync().ConfigureAwait(false))
return await FinishAndGetResponse().ConfigureAwait(false);

async Task WriteToStreamAsync(MultipartFormItem[] multipartItems, Stream requestStream)
{
// Write form request using the boundary
var boundaryBytes = _boundarySeparatorInBytes ??= Encoding.ASCII.GetBytes(BoundarySeparator);
var trailerBytes = _boundaryTrailerInBytes ??= Encoding.ASCII.GetBytes(BoundaryTrailer);

// Write each MultipartFormItem
var itemsWritten = 0;
foreach (var item in items)
foreach (var item in multipartItems)
{
byte[] headerBytes = null;

Expand Down Expand Up @@ -159,8 +177,6 @@ public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
await requestStream.WriteAsync(trailerBytes, 0, trailerBytes.Length).ConfigureAwait(false);
}
}

return await FinishAndGetResponse().ConfigureAwait(false);
}

private void ResetRequest(string method, string contentType, string contentEncoding)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// <copyright file="GzipCompressedContent.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

#if NETCOREAPP
using System.IO;
using System.IO.Compression;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Datadog.Trace.Logging;

namespace Datadog.Trace.Agent.Transports;

internal class GzipCompressedContent : HttpContent
{
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<GzipCompressedContent>();

private readonly HttpContent _content;

public GzipCompressedContent(HttpContent content)
{
// Copy original headers
foreach (var header in content.Headers)
{
Headers.TryAddWithoutValidation(header.Key, header.Value);
}

Headers.ContentEncoding.Add("gzip");
_content = content;
}

protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)
{
Log.Debug("GZip compressing payload...");
#pragma warning disable CA2007
await using var gzip = new GZipStream(stream, CompressionMode.Compress, true);
#pragma warning restore CA2007
await _content.CopyToAsync(gzip).ConfigureAwait(false);
tonyredondo marked this conversation as resolved.
Show resolved Hide resolved
}

protected override bool TryComputeLength(out long length)
{
length = -1;
return false;
}
}

#endif
14 changes: 11 additions & 3 deletions tracer/src/Datadog.Trace/Agent/Transports/HttpClientRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public async Task<IApiResponse> PostAsync(Func<Stream, Task> writeToRequestStrea
return new HttpClientResponse(response);
}

public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
public async Task<IApiResponse> PostAsync(MultipartFormItem[] items, MultipartCompression multipartCompression = MultipartCompression.None)
{
if (items is null)
{
Expand All @@ -133,8 +133,6 @@ public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
Log.Debug<int>("Sending multipart form request with {Count} items.", items.Length);

using var formDataContent = new MultipartFormDataContent(boundary: Boundary);
_postRequest.Content = formDataContent;

foreach (var item in items)
{
HttpContent content = null;
Expand Down Expand Up @@ -166,6 +164,16 @@ public async Task<IApiResponse> PostAsync(params MultipartFormItem[] items)
}
}

if (multipartCompression == MultipartCompression.GZip)
{
Log.Debug("Using MultipartCompression.GZip");
_postRequest.Content = new GzipCompressedContent(formDataContent);
}
else
{
_postRequest.Content = formDataContent;
}

var response = await _client.SendAsync(_postRequest).ConfigureAwait(false);
return new HttpClientResponse(response);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// <copyright file="MultipartCompression.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable

namespace Datadog.Trace.Agent.Transports;

internal enum MultipartCompression
{
None,
GZip
}
2 changes: 1 addition & 1 deletion tracer/src/Datadog.Trace/Ci/Agent/CIWriterHttpSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ await SendPayloadAsync(
{
if (request is IMultipartApiRequest multipartRequest)
{
return multipartRequest.PostAsync(payloadArray);
return multipartRequest.PostAsync(payloadArray, MultipartCompression.GZip);
}

MultipartApiRequestNotSupported.Throw();
Expand Down
4 changes: 2 additions & 2 deletions tracer/src/Datadog.Trace/Ci/IntelligentTestRunnerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,9 @@ async Task<long> InternalSendObjectsPackFileAsync(string packFile, bool finalTry

try
{
using var response = await multipartRequest.PostAsync(
using var response = await multipartRequest.PostAsync([
new MultipartFormItem("pushedSha", MimeTypes.Json, null, new ArraySegment<byte>(jsonPushedShaBytes)),
new MultipartFormItem("packfile", "application/octet-stream", null, fileStream))
new MultipartFormItem("packfile", "application/octet-stream", null, fileStream)])
.ConfigureAwait(false);
var responseContent = await response.ReadAsStringAsync().ConfigureAwait(false);
if (TelemetryHelper.GetErrorTypeFromStatusCode(response.StatusCode) is { } errorType)
Expand Down
Loading