Skip to content

Commit

Permalink
e2e: add receive test with capnp replication
Browse files Browse the repository at this point in the history
Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Oct 16, 2024
1 parent e80d842 commit 05cb1fc
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 30 deletions.
29 changes: 0 additions & 29 deletions pkg/receive/capnp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,35 +103,6 @@ func (c CapNProtoHandler) Write(ctx context.Context, call writecapnp.Writer_writ
result.SetError(writecapnp.WriteError_internal)
}
}
return nil
}

type BufferedListener struct {
ctx context.Context
cancel context.CancelFunc

conns chan net.Conn
}

func (b BufferedListener) Accept() (net.Conn, error) {
select {
case <-b.ctx.Done():
return nil, b.ctx.Err()
case c := <-b.conns:
return c, nil
}
}

func (b BufferedListener) Close() error {
b.cancel()
return nil
}

func (b BufferedListener) Addr() net.Addr {
return addr{}
}

type addr struct{}

func (addr) Network() string { return "bufconn" }
func (addr) String() string { return "bufconn" }
12 changes: 11 additions & 1 deletion test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ type ReceiveBuilder struct {
f e2e.FutureRunnable

maxExemplars int
capnp bool
ingestion bool
limit int
tenantsLimits receive.TenantsWriteLimitsConfig
Expand All @@ -555,7 +556,7 @@ type ReceiveBuilder struct {

func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder {
f := e.Runnable(fmt.Sprintf("receive-%v", name)).
WithPorts(map[string]int{"http": 8080, "grpc": 9091, "remote-write": 8081}).
WithPorts(map[string]int{"http": 8080, "grpc": 9091, "remote-write": 8081, "capnp": 19391}).
Future()
return &ReceiveBuilder{
Linkable: f,
Expand Down Expand Up @@ -586,6 +587,11 @@ func (r *ReceiveBuilder) WithLabel(name, value string) *ReceiveBuilder {
return r
}

func (r *ReceiveBuilder) UseCapnpReplication() *ReceiveBuilder {
r.capnp = true
return r
}

func (r *ReceiveBuilder) WithRouting(replication int, hashringConfigs ...receive.HashringConfig) *ReceiveBuilder {
r.hashringConfigs = hashringConfigs
r.replication = replication
Expand Down Expand Up @@ -646,6 +652,10 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable {
args["--label"] = fmt.Sprintf("%s,%s", args["--label"], strings.Join(r.labels, ","))
}

if r.capnp {
args["--receive.capnproto-replication"] = ""
}

hashring := r.hashringConfigs
if len(hashring) > 0 && r.ingestion {
args["--receive.local-endpoint"] = r.InternalEndpoint("grpc")
Expand Down
43 changes: 43 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)

Expand Down Expand Up @@ -1152,3 +1153,45 @@ func TestReceiveExtractsTenant(t *testing.T) {

})
}

func TestReceiveCpnp(t *testing.T) {
e, err := e2e.NewDockerEnvironment("receive-cpnp")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

h := receive.HashringConfig{
TenantMatcherType: "glob",
Tenants: []string{
"default*",
},
Endpoints: []receive.Endpoint{
{Address: i.InternalEndpoint("grpc"), CapNProtoAddress: i.InternalEndpoint("capnp")},
},
}

r := e2ethanos.NewReceiveBuilder(e, "router").WithRouting(1, h).UseCapnpReplication().Init()
testutil.Ok(t, e2e.StartAndWaitReady(r))

q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

require.NoError(t, runutil.RetryWithLog(logkit.NewLogfmtLogger(os.Stdout), 1*time.Second, make(<-chan struct{}), func() error {
return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{
Timeseries: []*prompb.TimeSeries{

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 0)

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 2)

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 4)

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 3)

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 1)

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 5)

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 6)

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal

Check failure on line 1183 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 7)

cannot use []*prompb.TimeSeries{…} (value of type []*"github.com/prometheus/prometheus/prompb".TimeSeries) as []"github.com/prometheus/prometheus/prompb".TimeSeries value in struct literal
{
Labels: []*labelpb.Label{

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 0)

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 2)

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 4)

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 3)

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 1)

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 5)

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 6)

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal

Check failure on line 1185 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 7)

cannot use []*labelpb.Label{…} (value of type []*labelpb.Label) as []"github.com/prometheus/prometheus/prompb".Label value in struct literal
{Name: "aa", Value: "bb"},
},
Samples: []*prompb.Sample{

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 0)

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 2)

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 4)

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 3)

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 1)

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 5)

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 6)

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal

Check failure on line 1188 in test/e2e/receive_test.go

View workflow job for this annotation

GitHub Actions / Thanos end-to-end tests (8, 7)

cannot use []*prompb.Sample{…} (value of type []*"github.com/prometheus/prometheus/prompb".Sample) as []"github.com/prometheus/prometheus/prompb".Sample value in struct literal
{Value: 1, Timestamp: time.Now().UnixMilli()},
},
},
},
})
}))

testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "default-tenant")), e2emon.WaitMissingMetrics()))
}

0 comments on commit 05cb1fc

Please sign in to comment.