From 0db9111d49061c9dec898afd1022d5c6dc13cd7d Mon Sep 17 00:00:00 2001 From: akats7 Date: Thu, 1 Feb 2024 14:23:01 -0500 Subject: [PATCH] Added skip for flaky test --- connector/failoverconnector/logs_test.go | 97 ++++++++++---------- connector/failoverconnector/metrics_test.go | 99 +++++++++++---------- connector/failoverconnector/traces_test.go | 99 +++++++++++---------- 3 files changed, 149 insertions(+), 146 deletions(-) diff --git a/connector/failoverconnector/logs_test.go b/connector/failoverconnector/logs_test.go index 7d555df93dcb..20db8b3ea491 100644 --- a/connector/failoverconnector/logs_test.go +++ b/connector/failoverconnector/logs_test.go @@ -136,54 +136,55 @@ func TestLogsWithFailoverError(t *testing.T) { assert.EqualError(t, conn.ConsumeLogs(context.Background(), ld), "All provided pipelines return errors") } -// func TestLogsWithFailoverRecovery(t *testing.T) { -// var sinkFirst, sinkSecond, sinkThird consumertest.LogsSink -// logsFirst := component.NewIDWithName(component.DataTypeLogs, "logs/first") -// logsSecond := component.NewIDWithName(component.DataTypeLogs, "logs/second") -// logsThird := component.NewIDWithName(component.DataTypeLogs, "logs/third") -// -// cfg := &Config{ -// PipelinePriority: [][]component.ID{{logsFirst}, {logsSecond}, {logsThird}}, -// RetryInterval: 50 * time.Millisecond, -// RetryGap: 10 * time.Millisecond, -// MaxRetries: 1000, -// } -// -// router := connector.NewLogsRouter(map[component.ID]consumer.Logs{ -// logsFirst: &sinkFirst, -// logsSecond: &sinkSecond, -// logsThird: &sinkThird, -// }) -// -// conn, err := NewFactory().CreateLogsToLogs(context.Background(), -// connectortest.NewNopCreateSettings(), cfg, router.(consumer.Logs)) -// -// require.NoError(t, err) -// -// failoverConnector := conn.(*logsFailover) -// failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errLogsConsumer)) -// defer func() { -// assert.NoError(t, failoverConnector.Shutdown(context.Background())) -// }() -// -// ld := sampleLog() -// -// require.NoError(t, conn.ConsumeLogs(context.Background(), ld)) -// _, ch, ok := failoverConnector.failover.getCurrentConsumer() -// idx := failoverConnector.failover.pS.ChannelIndex(ch) -// -// assert.True(t, ok) -// require.Equal(t, idx, 1) -// -// // Simulate recovery of exporter -// failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewNop()) -// -// require.Eventually(t, func() bool { -// _, ch, ok = failoverConnector.failover.getCurrentConsumer() -// idx = failoverConnector.failover.pS.ChannelIndex(ch) -// return ok && idx == 0 -// }, 3*time.Second, 100*time.Millisecond) -//} +func TestLogsWithFailoverRecovery(t *testing.T) { + t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31005") + var sinkFirst, sinkSecond, sinkThird consumertest.LogsSink + logsFirst := component.NewIDWithName(component.DataTypeLogs, "logs/first") + logsSecond := component.NewIDWithName(component.DataTypeLogs, "logs/second") + logsThird := component.NewIDWithName(component.DataTypeLogs, "logs/third") + + cfg := &Config{ + PipelinePriority: [][]component.ID{{logsFirst}, {logsSecond}, {logsThird}}, + RetryInterval: 50 * time.Millisecond, + RetryGap: 10 * time.Millisecond, + MaxRetries: 1000, + } + + router := connector.NewLogsRouter(map[component.ID]consumer.Logs{ + logsFirst: &sinkFirst, + logsSecond: &sinkSecond, + logsThird: &sinkThird, + }) + + conn, err := NewFactory().CreateLogsToLogs(context.Background(), + connectortest.NewNopCreateSettings(), cfg, router.(consumer.Logs)) + + require.NoError(t, err) + + failoverConnector := conn.(*logsFailover) + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errLogsConsumer)) + defer func() { + assert.NoError(t, failoverConnector.Shutdown(context.Background())) + }() + + ld := sampleLog() + + require.NoError(t, conn.ConsumeLogs(context.Background(), ld)) + _, ch, ok := failoverConnector.failover.getCurrentConsumer() + idx := failoverConnector.failover.pS.ChannelIndex(ch) + + assert.True(t, ok) + require.Equal(t, idx, 1) + + // Simulate recovery of exporter + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewNop()) + + require.Eventually(t, func() bool { + _, ch, ok = failoverConnector.failover.getCurrentConsumer() + idx = failoverConnector.failover.pS.ChannelIndex(ch) + return ok && idx == 0 + }, 3*time.Second, 100*time.Millisecond) +} func sampleLog() plog.Logs { l := plog.NewLogs() diff --git a/connector/failoverconnector/metrics_test.go b/connector/failoverconnector/metrics_test.go index 12c261a6a155..2ba469a6cebf 100644 --- a/connector/failoverconnector/metrics_test.go +++ b/connector/failoverconnector/metrics_test.go @@ -136,55 +136,56 @@ func TestMetricsWithFailoverError(t *testing.T) { assert.EqualError(t, conn.ConsumeMetrics(context.Background(), md), "All provided pipelines return errors") } -// func TestMetricsWithFailoverRecovery(t *testing.T) { -// var sinkSecond, sinkThird consumertest.MetricsSink -// metricsFirst := component.NewIDWithName(component.DataTypeMetrics, "metrics/first") -// metricsSecond := component.NewIDWithName(component.DataTypeMetrics, "metrics/second") -// metricsThird := component.NewIDWithName(component.DataTypeMetrics, "metrics/third") -// noOp := consumertest.NewNop() -// -// cfg := &Config{ -// PipelinePriority: [][]component.ID{{metricsFirst}, {metricsSecond}, {metricsThird}}, -// RetryInterval: 50 * time.Millisecond, -// RetryGap: 10 * time.Millisecond, -// MaxRetries: 1000, -// } -// -// router := connector.NewMetricsRouter(map[component.ID]consumer.Metrics{ -// metricsFirst: noOp, -// metricsSecond: &sinkSecond, -// metricsThird: &sinkThird, -// }) -// -// conn, err := NewFactory().CreateMetricsToMetrics(context.Background(), -// connectortest.NewNopCreateSettings(), cfg, router.(consumer.Metrics)) -// -// require.NoError(t, err) -// -// failoverConnector := conn.(*metricsFailover) -// failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errMetricsConsumer)) -// defer func() { -// assert.NoError(t, failoverConnector.Shutdown(context.Background())) -// }() -// -// md := sampleMetric() -// -// require.NoError(t, conn.ConsumeMetrics(context.Background(), md)) -// _, ch, ok := failoverConnector.failover.getCurrentConsumer() -// idx := failoverConnector.failover.pS.ChannelIndex(ch) -// -// assert.True(t, ok) -// require.Equal(t, idx, 1) -// -// // Simulate recovery of exporter -// failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewNop()) -// -// require.Eventually(t, func() bool { -// _, ch, ok = failoverConnector.failover.getCurrentConsumer() -// idx = failoverConnector.failover.pS.ChannelIndex(ch) -// return ok && idx == 0 -// }, 3*time.Second, 100*time.Millisecond) -//} +func TestMetricsWithFailoverRecovery(t *testing.T) { + t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31005") + var sinkSecond, sinkThird consumertest.MetricsSink + metricsFirst := component.NewIDWithName(component.DataTypeMetrics, "metrics/first") + metricsSecond := component.NewIDWithName(component.DataTypeMetrics, "metrics/second") + metricsThird := component.NewIDWithName(component.DataTypeMetrics, "metrics/third") + noOp := consumertest.NewNop() + + cfg := &Config{ + PipelinePriority: [][]component.ID{{metricsFirst}, {metricsSecond}, {metricsThird}}, + RetryInterval: 50 * time.Millisecond, + RetryGap: 10 * time.Millisecond, + MaxRetries: 1000, + } + + router := connector.NewMetricsRouter(map[component.ID]consumer.Metrics{ + metricsFirst: noOp, + metricsSecond: &sinkSecond, + metricsThird: &sinkThird, + }) + + conn, err := NewFactory().CreateMetricsToMetrics(context.Background(), + connectortest.NewNopCreateSettings(), cfg, router.(consumer.Metrics)) + + require.NoError(t, err) + + failoverConnector := conn.(*metricsFailover) + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errMetricsConsumer)) + defer func() { + assert.NoError(t, failoverConnector.Shutdown(context.Background())) + }() + + md := sampleMetric() + + require.NoError(t, conn.ConsumeMetrics(context.Background(), md)) + _, ch, ok := failoverConnector.failover.getCurrentConsumer() + idx := failoverConnector.failover.pS.ChannelIndex(ch) + + assert.True(t, ok) + require.Equal(t, idx, 1) + + // Simulate recovery of exporter + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewNop()) + + require.Eventually(t, func() bool { + _, ch, ok = failoverConnector.failover.getCurrentConsumer() + idx = failoverConnector.failover.pS.ChannelIndex(ch) + return ok && idx == 0 + }, 3*time.Second, 100*time.Millisecond) +} func sampleMetric() pmetric.Metrics { m := pmetric.NewMetrics() diff --git a/connector/failoverconnector/traces_test.go b/connector/failoverconnector/traces_test.go index cd97cad50099..b6bc2329f841 100644 --- a/connector/failoverconnector/traces_test.go +++ b/connector/failoverconnector/traces_test.go @@ -139,55 +139,56 @@ func TestTracesWithFailoverError(t *testing.T) { assert.EqualError(t, conn.ConsumeTraces(context.Background(), tr), "All provided pipelines return errors") } -// func TestTracesWithFailoverRecovery(t *testing.T) { -// var sinkSecond, sinkThird consumertest.TracesSink -// tracesFirst := component.NewIDWithName(component.DataTypeTraces, "traces/first") -// tracesSecond := component.NewIDWithName(component.DataTypeTraces, "traces/second") -// tracesThird := component.NewIDWithName(component.DataTypeTraces, "traces/third") -// noOp := consumertest.NewNop() -// -// cfg := &Config{ -// PipelinePriority: [][]component.ID{{tracesFirst}, {tracesSecond}, {tracesThird}}, -// RetryInterval: 50 * time.Millisecond, -// RetryGap: 10 * time.Millisecond, -// MaxRetries: 1000, -// } -// -// router := connector.NewTracesRouter(map[component.ID]consumer.Traces{ -// tracesFirst: noOp, -// tracesSecond: &sinkSecond, -// tracesThird: &sinkThird, -// }) -// -// conn, err := NewFactory().CreateTracesToTraces(context.Background(), -// connectortest.NewNopCreateSettings(), cfg, router.(consumer.Traces)) -// -// require.NoError(t, err) -// -// failoverConnector := conn.(*tracesFailover) -// failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer)) -// defer func() { -// assert.NoError(t, failoverConnector.Shutdown(context.Background())) -// }() -// -// tr := sampleTrace() -// -// require.NoError(t, conn.ConsumeTraces(context.Background(), tr)) -// _, ch, ok := failoverConnector.failover.getCurrentConsumer() -// idx := failoverConnector.failover.pS.ChannelIndex(ch) -// -// assert.True(t, ok) -// require.Equal(t, idx, 1) -// -// // Simulate recovery of exporter -// failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewNop()) -// -// require.Eventually(t, func() bool { -// _, ch, ok = failoverConnector.failover.getCurrentConsumer() -// idx = failoverConnector.failover.pS.ChannelIndex(ch) -// return ok && idx == 0 -// }, 3*time.Second, 100*time.Millisecond) -//} +func TestTracesWithFailoverRecovery(t *testing.T) { + t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31005") + var sinkSecond, sinkThird consumertest.TracesSink + tracesFirst := component.NewIDWithName(component.DataTypeTraces, "traces/first") + tracesSecond := component.NewIDWithName(component.DataTypeTraces, "traces/second") + tracesThird := component.NewIDWithName(component.DataTypeTraces, "traces/third") + noOp := consumertest.NewNop() + + cfg := &Config{ + PipelinePriority: [][]component.ID{{tracesFirst}, {tracesSecond}, {tracesThird}}, + RetryInterval: 50 * time.Millisecond, + RetryGap: 10 * time.Millisecond, + MaxRetries: 1000, + } + + router := connector.NewTracesRouter(map[component.ID]consumer.Traces{ + tracesFirst: noOp, + tracesSecond: &sinkSecond, + tracesThird: &sinkThird, + }) + + conn, err := NewFactory().CreateTracesToTraces(context.Background(), + connectortest.NewNopCreateSettings(), cfg, router.(consumer.Traces)) + + require.NoError(t, err) + + failoverConnector := conn.(*tracesFailover) + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer)) + defer func() { + assert.NoError(t, failoverConnector.Shutdown(context.Background())) + }() + + tr := sampleTrace() + + require.NoError(t, conn.ConsumeTraces(context.Background(), tr)) + _, ch, ok := failoverConnector.failover.getCurrentConsumer() + idx := failoverConnector.failover.pS.ChannelIndex(ch) + + assert.True(t, ok) + require.Equal(t, idx, 1) + + // Simulate recovery of exporter + failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewNop()) + + require.Eventually(t, func() bool { + _, ch, ok = failoverConnector.failover.getCurrentConsumer() + idx = failoverConnector.failover.pS.ChannelIndex(ch) + return ok && idx == 0 + }, 3*time.Second, 100*time.Millisecond) +} func sampleTrace() ptrace.Traces { tr := ptrace.NewTraces()