Skip to content
This repository has been archived by the owner on Jun 1, 2024. It is now read-only.

add async methods #349

Merged
merged 2 commits into from
Aug 6, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 118 additions & 88 deletions src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Elasticsearch.Net.Specification.SecurityApi;
using Serilog.Debugging;
using Serilog.Events;
using Serilog.Sinks.PeriodicBatching;
Expand Down Expand Up @@ -52,80 +54,36 @@ public ElasticsearchSink(ElasticsearchSinkOptions options)
/// or <see cref="M:Serilog.Sinks.PeriodicBatching.PeriodicBatchingSink.EmitBatchAsync(System.Collections.Generic.IEnumerable{Serilog.Events.LogEvent})" />,
/// not both.
/// </remarks>
protected override void EmitBatch(IEnumerable<LogEvent> events)
protected override async Task EmitBatchAsync(IEnumerable<LogEvent> events)
{
DynamicResponse result;

try
{
result = this.EmitBatchChecked<DynamicResponse>(events);
result = await this.EmitBatchCheckedAsync<DynamicResponse>(events);
}
catch (Exception ex)
{
HandleException(ex, events);
return;
}

// Handle the results from ES, check if there are any errors.
if (result.Success && result.Body?["errors"] == true)
{
var indexer = 0;
var items = result.Body["items"];
foreach (var item in items)
{
if (item["index"] != null && HasProperty(item["index"], "error") && item["index"]["error"] != null)
{
var e = events.ElementAt(indexer);
if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToSelfLog))
{
// ES reports an error, output the error to the selflog
SelfLog.WriteLine(
"Failed to store event with template '{0}' into Elasticsearch. Elasticsearch reports for index {1} the following: {2}",
e.MessageTemplate,
item["index"]["_index"],
_state.Serialize(item["index"]["error"]));
}

if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToFailureSink) &&
_state.Options.FailureSink != null)
{
// Send to a failure sink
try
{
_state.Options.FailureSink.Emit(e);
}
catch (Exception ex)
{
// We do not let this fail too
SelfLog.WriteLine("Caught exception while emitting to sink {1}: {0}", ex,
_state.Options.FailureSink);
}
}

if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.RaiseCallback) &&
_state.Options.FailureCallback != null)
{
// Send to a failure callback
try
{
_state.Options.FailureCallback(e);
}
catch (Exception ex)
{
// We do not let this fail too
SelfLog.WriteLine("Caught exception while emitting to callback {1}: {0}", ex,
_state.Options.FailureCallback);
}
}
HandleResponse(events, result);
}

}
indexer++;
}
}
else if (result.Success == false && result.OriginalException != null)
{
HandleException(result.OriginalException, events);
}
/// <summary>
/// Emit a batch of log events, running to completion synchronously.
/// </summary>
/// <param name="events">The events to emit.</param>
/// <returns>Response from Elasticsearch</returns>
protected virtual Task<T> EmitBatchCheckedAsync<T>(IEnumerable<LogEvent> events) where T : class, IElasticsearchResponse, new()
{
// ReSharper disable PossibleMultipleEnumeration
if (events == null || !events.Any())
return Task.FromResult<T>(default(T));

var payload = CreatePlayLoad<T>(events);
return _state.Client.BulkAsync<T>(PostData.MultiJson(payload));
}

/// <summary>
Expand All @@ -139,32 +97,7 @@ protected override void EmitBatch(IEnumerable<LogEvent> events)
if (events == null || !events.Any())
return null;

if (!_state.TemplateRegistrationSuccess && _state.Options.RegisterTemplateFailure == RegisterTemplateRecovery.FailSink)
{
return null;
}

var payload = new List<string>();
foreach (var e in events)
{
var indexName = _state.GetIndexForEvent(e, e.Timestamp.ToUniversalTime());
var action = default(object);

var pipelineName = _state.Options.PipelineNameDecider?.Invoke(e) ?? _state.Options.PipelineName;
if (string.IsNullOrWhiteSpace(pipelineName))
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName } };
}
else
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName, pipeline = pipelineName } };
}
var actionJson = _state.Serialize(action);
payload.Add(actionJson);
var sw = new StringWriter();
_state.Formatter.Format(e, sw);
payload.Add(sw.ToString());
}
var payload = CreatePlayLoad<T>(events);
return _state.Client.Bulk<T>(PostData.MultiJson(payload));
}

Expand Down Expand Up @@ -225,11 +158,108 @@ private static bool HasProperty(dynamic settings, string name)
{
if (settings is System.Dynamic.ExpandoObject)
return ((IDictionary<string, object>)settings).ContainsKey(name);

if (settings is System.Dynamic.DynamicObject)
return ((System.Dynamic.DynamicObject)settings).GetDynamicMemberNames().Contains(name);

return settings.GetType().GetProperty(name) != null;
}

private IEnumerable<string> CreatePlayLoad<T>(IEnumerable<LogEvent> events)
where T : class, IElasticsearchResponse, new()
{
if (!_state.TemplateRegistrationSuccess && _state.Options.RegisterTemplateFailure == RegisterTemplateRecovery.FailSink)
{
return null;
}

var payload = new List<string>();
foreach (var e in events)
{
var indexName = _state.GetIndexForEvent(e, e.Timestamp.ToUniversalTime());
var action = default(object);

var pipelineName = _state.Options.PipelineNameDecider?.Invoke(e) ?? _state.Options.PipelineName;
if (string.IsNullOrWhiteSpace(pipelineName))
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName } };
}
else
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName, pipeline = pipelineName } };
}
var actionJson = _state.Serialize(action);
payload.Add(actionJson);
var sw = new StringWriter();
_state.Formatter.Format(e, sw);
payload.Add(sw.ToString());
}

return payload;
}

private void HandleResponse(IEnumerable<LogEvent> events, DynamicResponse result)
{
// Handle the results from ES, check if there are any errors.
if (result.Success && result.Body?["errors"] == true)
{
var indexer = 0;
var items = result.Body["items"];
foreach (var item in items)
{
if (item["index"] != null && HasProperty(item["index"], "error") && item["index"]["error"] != null)
{
var e = events.ElementAt(indexer);
if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToSelfLog))
{
// ES reports an error, output the error to the selflog
SelfLog.WriteLine(
"Failed to store event with template '{0}' into Elasticsearch. Elasticsearch reports for index {1} the following: {2}",
e.MessageTemplate,
item["index"]["_index"],
_state.Serialize(item["index"]["error"]));
}

if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToFailureSink) &&
_state.Options.FailureSink != null)
{
// Send to a failure sink
try
{
_state.Options.FailureSink.Emit(e);
}
catch (Exception ex)
{
// We do not let this fail too
SelfLog.WriteLine("Caught exception while emitting to sink {1}: {0}", ex,
_state.Options.FailureSink);
}
}

if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.RaiseCallback) &&
_state.Options.FailureCallback != null)
{
// Send to a failure callback
try
{
_state.Options.FailureCallback(e);
}
catch (Exception ex)
{
// We do not let this fail too
SelfLog.WriteLine("Caught exception while emitting to callback {1}: {0}", ex,
_state.Options.FailureCallback);
}
}

}
indexer++;
}
}
else if (result.Success == false && result.OriginalException != null)
{
HandleException(result.OriginalException, events);
}
}
}
}