From 33d975e0464e4682cfe24f0bf4720c97df4be3b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Thu, 1 Aug 2024 12:33:13 +0200 Subject: [PATCH 1/4] sdk/log: SimpleProcessor synchronizes OnEmit calls --- sdk/log/simple.go | 4 ++++ sdk/log/simple_test.go | 24 +++++++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sdk/log/simple.go b/sdk/log/simple.go index 8b6c34dc080..501d00c4a00 100644 --- a/sdk/log/simple.go +++ b/sdk/log/simple.go @@ -13,6 +13,7 @@ var _ Processor = (*SimpleProcessor)(nil) // SimpleProcessor is an processor that synchronously exports log records. type SimpleProcessor struct { + mu sync.Mutex exporter Exporter } @@ -41,6 +42,9 @@ var simpleProcRecordsPool = sync.Pool{ // OnEmit batches provided log record. func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error { + s.mu.Lock() + defer s.mu.Unlock() + records := simpleProcRecordsPool.Get().(*[]Record) (*records)[0] = *r defer func() { diff --git a/sdk/log/simple_test.go b/sdk/log/simple_test.go index dbc91a90156..82f7f4eb62e 100644 --- a/sdk/log/simple_test.go +++ b/sdk/log/simple_test.go @@ -5,6 +5,8 @@ package log_test import ( "context" + "io" + "strings" "sync" "testing" @@ -69,6 +71,25 @@ func TestSimpleProcessorForceFlush(t *testing.T) { require.True(t, e.forceFlushCalled, "exporter ForceFlush not called") } +type writerExporter struct { + io.Writer +} + +func (e *writerExporter) Export(_ context.Context, records []log.Record) error { + for _, r := range records { + _, _ = io.WriteString(e.Writer, r.Body().String()) + } + return nil +} + +func (e *writerExporter) Shutdown(context.Context) error { + return nil +} + +func (e *writerExporter) ForceFlush(context.Context) error { + return nil +} + func TestSimpleProcessorConcurrentSafe(t *testing.T) { const goRoutineN = 10 @@ -78,7 +99,8 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) { r := new(log.Record) r.SetSeverityText("test") ctx := context.Background() - s := log.NewSimpleProcessor(nil) + e := &writerExporter{new(strings.Builder)} + s := log.NewSimpleProcessor(e) for i := 0; i < goRoutineN; i++ { go func() { defer wg.Done() From 5f22652fa4c04e6e427a963fccfd449b318a2695 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Thu, 1 Aug 2024 12:39:40 +0200 Subject: [PATCH 2/4] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0cc72e908f..6602167c073 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636) +- `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666) ### Fixed From a68c20d123243794a3ecd051431fbdb00f762662 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Thu, 1 Aug 2024 14:03:59 +0200 Subject: [PATCH 3/4] Update docs --- sdk/log/exporter.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 1cdddc03e39..9a42c1246e1 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -15,10 +15,6 @@ import ( ) // Exporter handles the delivery of log records to external receivers. -// -// Any of the Exporter's methods may be called concurrently with itself -// or with other methods. It is the responsibility of the Exporter to manage -// this concurrency. type Exporter interface { // Export transmits log records to a receiver. // @@ -34,6 +30,9 @@ type Exporter interface { // // Before modifying a Record, the implementation must use Record.Clone // to create a copy that shares no state with the original. + // + // Export should never be called concurrently with other Export calls. + // However, it may be called concurrently with other methods. Export(ctx context.Context, records []Record) error // Shutdown is called when the SDK shuts down. Any cleanup or release of // resources held by the exporter should be done in this call. @@ -43,12 +42,16 @@ type Exporter interface { // // After Shutdown is called, calls to Export, Shutdown, or ForceFlush // should perform no operation and return nil error. + // + // Shutdown may be called concurrently with itself or with other methods. Shutdown(ctx context.Context) error // ForceFlush exports log records to the configured Exporter that have not yet // been exported. // // The deadline or cancellation of the passed context must be honored. An // appropriate error should be returned in these situations. + // + // ForceFlush may be called concurrently with itself or with other methods. ForceFlush(ctx context.Context) error } From b40deba22389dffd30c53259d0a6a0750d15af22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Thu, 1 Aug 2024 14:05:41 +0200 Subject: [PATCH 4/4] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6602167c073..86260eccd98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636) +- Update the concurrency requirements of `Exporter` in `go.opentelemetry.io/otel/sdk/log`. (#5666) - `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666) ### Fixed