Skip to content

Commit

Permalink
[Internal] AI integration: Refactor code how container and database n…
Browse files Browse the repository at this point in the history
…ame is flowing to opentelemetry module (#3532)

* wip

* WIP

* Revert "WIP"

This reverts commit 71275de54b9e67fa54a37e79d450b9597e173934.

* Revert "wip"

This reverts commit 586fa9865cc3f40dabd7ef90fb3e0cf499a045bc.

* wip add containe and database info

* redesign how container and database name information flows into opne telemetry data

* test fix

* fix test

* fix tests

* fix typos

* baseline test fix

Co-authored-by: Sourabh Jain <sourabhjain@microsoft.com>
  • Loading branch information
sourabh1007 and sourabh1007 authored Nov 15, 2022
1 parent ab1f249 commit 9fb3a12
Show file tree
Hide file tree
Showing 43 changed files with 1,014 additions and 731 deletions.
15 changes: 8 additions & 7 deletions Microsoft.Azure.Cosmos/src/Batch/BatchCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,12 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
CancellationToken cancellationToken = default)
{
return this.container.ClientContext.OperationHelperAsync(
nameof(ExecuteAsync),
requestOptions,
(trace) =>
operationName: nameof(ExecuteAsync),
containerName: this.container.Id,
databaseName: this.container.Database.Id,
operationType: Documents.OperationType.Replace,
requestOptions: requestOptions,
task: (trace) =>
{
BatchExecutor executor = new BatchExecutor(
container: this.container,
Expand All @@ -232,10 +235,8 @@ public override Task<TransactionalBatchResponse> ExecuteAsync(
this.operations = new List<ItemBatchOperation>();
return executor.ExecuteAsync(trace, cancellationToken);
},
(response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id));
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response));
}

/// <summary>
Expand Down
10 changes: 5 additions & 5 deletions Microsoft.Azure.Cosmos/src/ChangeFeed/ChangeFeedIteratorCore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace Microsoft.Azure.Cosmos.ChangeFeed
using Microsoft.Azure.Cosmos.Pagination;
using Microsoft.Azure.Cosmos.Query.Core;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Documents;

internal sealed class ChangeFeedIteratorCore : FeedIteratorInternal
{
Expand Down Expand Up @@ -222,12 +222,12 @@ public ChangeFeedIteratorCore(
public override async Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return await this.clientContext.OperationHelperAsync("Change Feed Iterator Read Next Async",
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id ?? this.databaseName,
operationType: OperationType.ReadFeed,
requestOptions: this.changeFeedRequestOptions,
task: (trace) => this.ReadNextInternalAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
openTelemetry: (response) => new OpenTelemetryResponse(responseMessage: response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ public override CosmosElement GetCosmosElementContinuationToken()
/// <returns>A change feed response from cosmos service</returns>
public override Task<ResponseMessage> ReadNextAsync(CancellationToken cancellationToken = default)
{
return this.clientContext.OperationHelperAsync("Change Feed Processor Read Next Async",
return this.clientContext.OperationHelperAsync(
operationName: "Change Feed Processor Read Next Async",
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id ?? this.databaseName,
operationType: Documents.OperationType.ReadFeed,
requestOptions: this.changeFeedOptions,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse(
responseMessage: response,
containerName: this.container?.Id,
databaseName: this.container?.Database?.Id),
responseMessage: response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,14 @@ private ChangeFeedEstimatorIterator(

public override Task<FeedResponse<ChangeFeedProcessorState>> ReadNextAsync(CancellationToken cancellationToken = default)
{
return this.monitoredContainer.ClientContext.OperationHelperAsync("Change Feed Estimator Read Next Async",
return this.monitoredContainer.ClientContext.OperationHelperAsync(
operationName: "Change Feed Estimator Read Next Async",
containerName: this.monitoredContainer?.Id,
databaseName: this.monitoredContainer?.Database?.Id,
operationType: Documents.OperationType.ReadFeed,
requestOptions: null,
task: (trace) => this.ReadNextAsync(trace, cancellationToken),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(
responseMessage: response,
containerName: this.monitoredContainer?.Id,
databaseName: this.monitoredContainer?.Database?.Id ?? this.databaseName),
openTelemetry: (response) => new OpenTelemetryResponse<ChangeFeedProcessorState>(responseMessage: response),
traceComponent: TraceComponent.ChangeFeed,
traceLevel: TraceLevel.Info);
}
Expand Down
164 changes: 86 additions & 78 deletions Microsoft.Azure.Cosmos/src/CosmosClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ namespace Microsoft.Azure.Cosmos
using Microsoft.Azure.Cosmos.Handlers;
using Microsoft.Azure.Cosmos.Query.Core.Monads;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
using Microsoft.Azure.Cosmos.Telemetry;
using Microsoft.Azure.Cosmos.Tracing;
using Microsoft.Azure.Cosmos.Tracing.TraceData;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -629,9 +627,12 @@ internal CosmosClient(
public virtual Task<AccountProperties> ReadAccountAsync()
{
return this.ClientContext.OperationHelperAsync(
nameof(ReadAccountAsync),
null,
(trace) => ((IDocumentClientInternal)this.DocumentClient).GetDatabaseAccountInternalAsync(this.Endpoint));
operationName: nameof(ReadAccountAsync),
containerName: null,
databaseName: null,
operationType: OperationType.Read,
requestOptions: null,
task: (trace) => ((IDocumentClientInternal)this.DocumentClient).GetDatabaseAccountInternalAsync(this.Endpoint));
}

/// <summary>
Expand Down Expand Up @@ -715,9 +716,12 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseAsync),
containerName: null,
databaseName: id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
ThroughputProperties throughputProperties = ThroughputProperties.CreateManualThroughput(throughput);
Expand All @@ -729,10 +733,7 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
trace: trace,
cancellationToken: cancellationToken);
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
openTelemetry: (response) => new OpenTelemetryResponse<DatabaseProperties>(responseMessage: response));
}

/// <summary>
Expand Down Expand Up @@ -765,9 +766,12 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseAsync),
containerName: null,
databaseName: id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
return this.CreateDatabaseInternalAsync(
Expand All @@ -777,10 +781,7 @@ public virtual Task<DatabaseResponse> CreateDatabaseAsync(
trace: trace,
cancellationToken: cancellationToken);
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
openTelemetry: (response) => new OpenTelemetryResponse<DatabaseProperties>(responseMessage: response));
}

/// <summary>
Expand Down Expand Up @@ -824,59 +825,60 @@ public virtual Task<DatabaseResponse> CreateDatabaseIfNotExistsAsync(
return string.IsNullOrEmpty(id)
? throw new ArgumentNullException(nameof(id))
: this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
async (trace) =>
{
double totalRequestCharge = 0;
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
DatabaseCore database = (DatabaseCore)this.GetDatabase(id);
using (ResponseMessage readResponse = await database.ReadStreamAsync(
operationName: nameof(CreateDatabaseIfNotExistsAsync),
containerName: null,
databaseName: id,
operationType: OperationType.Create,
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge = readResponse.Headers.RequestCharge;
if (readResponse.StatusCode != HttpStatusCode.NotFound)
task: async (trace) =>
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
}
}

using (ResponseMessage createResponse = await this.CreateDatabaseStreamInternalAsync(
databaseProperties,
throughputProperties,
requestOptions,
trace,
cancellationToken))
{
totalRequestCharge += createResponse.Headers.RequestCharge;
createResponse.Headers.RequestCharge = totalRequestCharge;

if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
}
}

// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
using (ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge += readResponseAfterConflict.Headers.RequestCharge;
readResponseAfterConflict.Headers.RequestCharge = totalRequestCharge;

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
}
},
(response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response,
containerName: null,
databaseName: response.Resource?.Id));
double totalRequestCharge = 0;
// Doing a Read before Create will give us better latency for existing databases
DatabaseProperties databaseProperties = this.PrepareDatabaseProperties(id);
DatabaseCore database = (DatabaseCore)this.GetDatabase(id);
using (ResponseMessage readResponse = await database.ReadStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge = readResponse.Headers.RequestCharge;
if (readResponse.StatusCode != HttpStatusCode.NotFound)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(database, readResponse);
}
}

using (ResponseMessage createResponse = await this.CreateDatabaseStreamInternalAsync(
databaseProperties,
throughputProperties,
requestOptions,
trace,
cancellationToken))
{
totalRequestCharge += createResponse.Headers.RequestCharge;
createResponse.Headers.RequestCharge = totalRequestCharge;

if (createResponse.StatusCode != HttpStatusCode.Conflict)
{
return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), createResponse);
}
}

// This second Read is to handle the race condition when 2 or more threads have Read the database and only one succeeds with Create
// so for the remaining ones we should do a Read instead of throwing Conflict exception
using (ResponseMessage readResponseAfterConflict = await database.ReadStreamAsync(
requestOptions: requestOptions,
trace: trace,
cancellationToken: cancellationToken))
{
totalRequestCharge += readResponseAfterConflict.Headers.RequestCharge;
readResponseAfterConflict.Headers.RequestCharge = totalRequestCharge;

return this.ClientContext.ResponseFactory.CreateDatabaseResponse(this.GetDatabase(databaseProperties.Id), readResponseAfterConflict);
}
},
openTelemetry: (response) => new OpenTelemetryResponse<DatabaseProperties>(
responseMessage: response));
}

/// <summary>
Expand Down Expand Up @@ -1165,9 +1167,12 @@ public virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseStreamAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseStreamAsync),
containerName: null,
databaseName: databaseProperties.Id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
Expand All @@ -1177,7 +1182,7 @@ public virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
trace,
cancellationToken);
},
(response) => new OpenTelemetryResponse(response));
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

/// <summary>
Expand Down Expand Up @@ -1260,9 +1265,12 @@ internal virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
}

return this.ClientContext.OperationHelperAsync(
nameof(CreateDatabaseIfNotExistsAsync),
requestOptions,
(trace) =>
operationName: nameof(CreateDatabaseIfNotExistsAsync),
containerName: null,
databaseName: databaseProperties.Id,
operationType: OperationType.Create,
requestOptions: requestOptions,
task: (trace) =>
{
this.ClientContext.ValidateResource(databaseProperties.Id);
return this.CreateDatabaseStreamInternalAsync(
Expand All @@ -1272,7 +1280,7 @@ internal virtual Task<ResponseMessage> CreateDatabaseStreamAsync(
trace,
cancellationToken);
},
(response) => new OpenTelemetryResponse(response));
openTelemetry: (response) => new OpenTelemetryResponse(response));
}

private async Task<DatabaseResponse> CreateDatabaseInternalAsync(
Expand Down
Loading

0 comments on commit 9fb3a12

Please sign in to comment.