Skip to content

Commit

Permalink
Fix logs exporter test
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed May 9, 2024
1 parent c5fd34c commit 7fdf150
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions exporter/elasticsearchexporter/logs_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ func TestExporter_PushEvent(t *testing.T) {
})

exporter := newTestExporter(t, server.URL)
mustSend(t, exporter, `{"message": "test1"}`)
mustSend(t, exporter, `{"message": "test2"}`)
mustSend(t, exporter, `{"message": "test1"}`, `{"message": "test2"}`)

rec.WaitItems(2)
})
Expand Down Expand Up @@ -392,7 +391,6 @@ func TestExporter_PushEvent(t *testing.T) {
exporter := newTestExporter(t, server.URL, func(cfg *Config) { *cfg = *testConfig })
mustSend(t, exporter, `{"message": "test1"}`)

time.Sleep(200 * time.Millisecond)
assert.Equal(t, int64(1), attempts.Load())
})
}
Expand All @@ -408,9 +406,9 @@ func TestExporter_PushEvent(t *testing.T) {
})

exporter := newTestExporter(t, server.URL)
mustSend(t, exporter, `{"message": "test1"}`)
err := send(t, exporter, `{"message": "test1"}`)
assert.ErrorContains(t, err, "flush failed")

time.Sleep(200 * time.Millisecond)
assert.Equal(t, int64(1), attempts.Load())
})

Expand Down Expand Up @@ -444,7 +442,6 @@ func TestExporter_PushEvent(t *testing.T) {
exporter := newTestExporter(t, server.URL)
mustSend(t, exporter, `{"message": "test1"}`)

time.Sleep(200 * time.Millisecond)
assert.Equal(t, int64(1), attempts.Load())
})

Expand Down Expand Up @@ -482,9 +479,8 @@ func TestExporter_PushEvent(t *testing.T) {
cfg.Retry.InitialInterval = 1 * time.Millisecond
cfg.Retry.MaxInterval = 10 * time.Millisecond
})
mustSend(t, exporter, `{"message": "test1", "idx": 0}`)
mustSend(t, exporter, `{"message": "test2", "idx": 1}`)
mustSend(t, exporter, `{"message": "test3", "idx": 2}`)
mustSend(t, exporter, `{"message": "test1", "idx": 0}`,
`{"message": "test2", "idx": 1}`, `{"message": "test3", "idx": 2}`)

wg.Wait() // <- this blocks forever if the event is not retried

Expand Down Expand Up @@ -515,8 +511,22 @@ func withTestExporterConfig(fns ...func(*Config)) func(string) *Config {
}
}

func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string) {
err := pushDocuments(context.TODO(), exporter.index, []byte(contents), exporter.bulkIndexer)
func send(t *testing.T, exporter *elasticsearchLogsExporter, contents ...string) error {
req := request{
bulkIndexer: exporter.bulkIndexer,
Items: nil,
}
for _, body := range contents {
req.Add(bulkIndexerItem{
Index: exporter.index,
Body: []byte(body),
})
}
return req.Export(context.TODO())
}

func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents ...string) {
err := send(t, exporter, contents...)
require.NoError(t, err)
}

Expand All @@ -528,6 +538,13 @@ func mustSendLogsWithAttributes(t *testing.T, exporter *elasticsearchLogsExporte
logRecords := scopeLog.LogRecords().At(0)
logRecords.Body().SetStr(body)

err := exporter.pushLogRecord(context.TODO(), resSpans.Resource(), logRecords, scopeLog.Scope())
req := request{
bulkIndexer: exporter.bulkIndexer,
Items: nil,
}
item, err := exporter.logRecordToItem(context.TODO(), resSpans.Resource(), logRecords, scopeLog.Scope())
require.NoError(t, err)
req.Add(item)
err = req.Export(context.TODO())
require.NoError(t, err)
}

0 comments on commit 7fdf150

Please sign in to comment.