-
-
Notifications
You must be signed in to change notification settings - Fork 323
/
AzureStorageBlobDataProvider.cs
292 lines (264 loc) · 11.9 KB
/
AzureStorageBlobDataProvider.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
using Audit.Core;
using System;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Audit.AzureStorageBlobs.ConfigurationApi;
using Azure.Storage;
using Azure;
using Azure.Core;
using System.Collections.Generic;
using System.Collections.Concurrent;
using Azure.Storage.Blobs.Models;
using System.Threading;
namespace Audit.AzureStorageBlobs.Providers
{
public class AzureStorageBlobDataProvider : AuditDataProvider
{
/// <summary>
/// Azure Blob connection string
/// </summary>
public string ConnectionString { get; set; }
/// <summary>
/// Azure Blob Container name
/// </summary>
public Setting<string> ContainerName { get; set; }
/// <summary>
/// Azure Blob name builder
/// </summary>
public Setting<string> BlobName { get; set; }
/// <summary>
/// The Azure.Storage.Blobs Client Options to use
/// </summary>
public BlobClientOptions ClientOptions { get; set; }
/// <summary>
/// The Service URL to connect to. Alternative to ConnectionString.
/// </summary>
public string ServiceUrl { get; set; }
/// <summary>
/// The Shared Key credential to use to connect to the Service URL.
/// </summary>
public StorageSharedKeyCredential SharedKeyCredential { get; set; }
/// <summary>
/// The Sas credential to use to connect to the Service URL.
/// </summary>
public AzureSasCredential SasCredential { get; set; }
/// <summary>
/// The Token credential to use to connect to the Service URL.
/// </summary>
public TokenCredential TokenCredential { get; set; }
/// <summary>
/// Gets or sets the standard blob tier to use (or null to use the default).
/// </summary>
public Setting<AccessTier?> AccessTier { get; set; }
/// <summary>
/// Gets or sets the metadata key/values to store on the blob.
/// </summary>
public Setting<IDictionary<string, string>> Metadata { get; set; }
/// <summary>
/// Gets or sets the Tags to store on the blob.
/// </summary>
public Setting<IDictionary<string, string>> Tags { get; set; }
private static readonly IDictionary<string, BlobContainerClient> ContainerClientCache = new ConcurrentDictionary<string, BlobContainerClient>();
public AzureStorageBlobDataProvider()
{
}
public AzureStorageBlobDataProvider(Action<IAzureBlobConnectionConfigurator> config)
{
var cfg = new AzureBlobConnectionConfigurator();
config.Invoke(cfg);
ConnectionString = cfg._connectionString;
ContainerName = cfg._containerConfig._containerName;
BlobName = cfg._containerConfig._blobName;
ClientOptions = cfg._containerConfig._clientOptions;
AccessTier = cfg._containerConfig._accessTier;
Metadata = cfg._containerConfig._metadata;
Tags = cfg._containerConfig._tags;
if (cfg._credentialConfig != null)
{
ServiceUrl = cfg._credentialConfig._serviceUrl;
SharedKeyCredential = cfg._credentialConfig._sharedKeyCredential;
SasCredential = cfg._credentialConfig._sasCredential;
TokenCredential = cfg._credentialConfig._tokenCredential;
}
}
/// <summary>
/// Returns the instance of the Azure.Storage.Blobs.BlobServiceClient to use for the given AuditEvent
/// </summary>
public BlobContainerClient GetContainerClient(AuditEvent auditEvent)
{
var containerName = ContainerName.GetValue(auditEvent);
return EnsureContainerClient(containerName);
}
private BlobContainerClient EnsureContainerClient(string containerName)
{
var cacheKey = $"{ConnectionString ?? ServiceUrl}|{containerName}";
if (ContainerClientCache.TryGetValue(cacheKey, out BlobContainerClient result))
{
// Cache hit
return result;
}
// Cache miss
var serviceClient = CreateBlobServiceClient();
var containerClient = serviceClient.GetBlobContainerClient(containerName);
containerClient.CreateIfNotExists();
ContainerClientCache[cacheKey] = containerClient;
return containerClient;
}
/// <summary>
/// Returns the instance of the Azure.Storage.Blobs.BlobServiceClient to use for the given AuditEvent
/// </summary>
public Task<BlobContainerClient> GetContainerClientAsync(AuditEvent auditEvent, CancellationToken cancellationToken)
{
var containerName = ContainerName.GetValue(auditEvent);
return EnsureContainerClientAsync(containerName, cancellationToken);
}
private async Task<BlobContainerClient> EnsureContainerClientAsync(string containerName, CancellationToken cancellationToken)
{
var cacheKey = $"{ConnectionString ?? ServiceUrl}|{containerName}";
if (ContainerClientCache.TryGetValue(cacheKey, out BlobContainerClient result))
{
// Cache hit
return result;
}
// Cache miss
var serviceClient = CreateBlobServiceClient();
var containerClient = serviceClient.GetBlobContainerClient(containerName);
await containerClient.CreateIfNotExistsAsync(default, default, default, cancellationToken);
ContainerClientCache[cacheKey] = containerClient;
return containerClient;
}
private BlobServiceClient CreateBlobServiceClient()
{
BlobServiceClient serviceClient;
if (ConnectionString != null)
{
serviceClient = new BlobServiceClient(ConnectionString, ClientOptions);
}
else
{
if (SasCredential != null)
{
// Using SAS credential
serviceClient = new BlobServiceClient(new Uri(ServiceUrl), SasCredential, ClientOptions);
}
else if (SharedKeyCredential != null)
{
// Using Shared Key credential
serviceClient = new BlobServiceClient(new Uri(ServiceUrl), SharedKeyCredential, ClientOptions);
}
else if (TokenCredential != null)
{
// Using Token Credential
serviceClient = new BlobServiceClient(new Uri(ServiceUrl), TokenCredential, ClientOptions);
}
else
{
// Anonymous by service URL
serviceClient = new BlobServiceClient(new Uri(ServiceUrl), ClientOptions);
}
}
return serviceClient;
}
protected string Upload(BlobContainerClient client, AuditEvent auditEvent, string existingBlobName)
{
var blobName = existingBlobName ?? BlobName.GetValue(auditEvent) ?? string.Format("{0}.json", Guid.NewGuid());
var blob = client.GetBlobClient(blobName);
var options = new BlobUploadOptions()
{
Metadata = Metadata.GetValue(auditEvent),
AccessTier = AccessTier.GetValue(auditEvent),
Tags = Tags.GetValue(auditEvent)
};
blob.Upload(new BinaryData(auditEvent, Configuration.JsonSettings), options);
return blobName;
}
protected async Task<string> UploadAsync(BlobContainerClient client, AuditEvent auditEvent, string existingBlobName, CancellationToken cancellationToken)
{
var blobName = existingBlobName ?? BlobName.GetValue(auditEvent) ?? string.Format("{0}.json", Guid.NewGuid());
var blob = client.GetBlobClient(blobName);
var options = new BlobUploadOptions()
{
Metadata = Metadata.GetValue(auditEvent),
AccessTier = AccessTier.GetValue(auditEvent),
Tags = Tags.GetValue(auditEvent)
};
await blob.UploadAsync(new BinaryData(auditEvent, Core.Configuration.JsonSettings), options, cancellationToken);
return blobName;
}
#region Public
/// <inheritdoc />
public override object InsertEvent(AuditEvent auditEvent)
{
var client = GetContainerClient(auditEvent);
var blobName = Upload(client, auditEvent, null);
return blobName;
}
/// <inheritdoc />
public override async Task<object> InsertEventAsync(AuditEvent auditEvent, CancellationToken cancellationToken = default)
{
var client = await GetContainerClientAsync(auditEvent, cancellationToken);
var blobName = await UploadAsync(client, auditEvent, null, cancellationToken);
return blobName;
}
/// <inheritdoc />
public override void ReplaceEvent(object eventId, AuditEvent auditEvent)
{
var client = GetContainerClient(auditEvent);
Upload(client, auditEvent, eventId.ToString());
}
/// <inheritdoc />
public override async Task ReplaceEventAsync(object eventId, AuditEvent auditEvent, CancellationToken cancellationToken = default)
{
var client = await GetContainerClientAsync(auditEvent, cancellationToken);
await UploadAsync(client, auditEvent, eventId.ToString(), cancellationToken);
}
/// <inheritdoc />
public override T GetEvent<T>(object blobName)
{
var containerName = ContainerName.GetDefault();
return GetEvent<T>(containerName, blobName.ToString());
}
/// <inheritdoc />
public override async Task<T> GetEventAsync<T>(object blobName, CancellationToken cancellationToken = default)
{
var containerName = ContainerName.GetDefault();
return await GetEventAsync<T>(containerName, blobName.ToString(), cancellationToken);
}
/// <summary>
/// Get the event from the blob storage by container name and blob name
/// </summary>
/// <typeparam name="T">The event type</typeparam>
/// <param name="containerName">Container name</param>
/// <param name="blobName">Blob name</param>
public T GetEvent<T>(string containerName, string blobName)
{
var client = EnsureContainerClient(containerName);
var blobClient = client.GetBlobClient(blobName);
if (blobClient.Exists())
{
var result = blobClient.DownloadContent();
return result.Value.Content.ToObjectFromJson<T>(Core.Configuration.JsonSettings);
}
return default;
}
/// <summary>
/// Get the event from the blob storage by container name and blob name
/// </summary>
/// <typeparam name="T">The event type</typeparam>
/// <param name="containerName">Container name</param>
/// <param name="blobName">Blob name</param>
/// <param name="cancellationToken">The cancellation token</param>
public async Task<T> GetEventAsync<T>(string containerName, string blobName, CancellationToken cancellationToken = default)
{
var client = await EnsureContainerClientAsync(containerName, cancellationToken);
var blobClient = client.GetBlobClient(blobName);
if (await blobClient.ExistsAsync(cancellationToken))
{
var result = await blobClient.DownloadContentAsync(cancellationToken);
return result.Value.Content.ToObjectFromJson<T>(Core.Configuration.JsonSettings);
}
return default;
}
#endregion
}
}