Skip to content

Commit

Permalink
Implement capnproto replication
Browse files Browse the repository at this point in the history
Our profiles from production show that a lot of CPU and memory in receivers
is used for unmarshaling protobuf messages. Although it is not possible to change
the remote-write format, we have the freedom to change the protocol used
for replicating timeseries data.

This commit introduces a new feature in receivers where replication can be done
using Cap'n Proto instead of gRPC + Protobuf. The advantage of the former protocol
is that deserialization is far cheaper and fields can be accessed directly from
the received message (byte slice) without allocating intermediate objects.
There is an additional cost for serialization because we have to convert from
Protobuf to the Cap'n proto format, but in our setup this still results in a net
reduction in resource usage.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Aug 22, 2024
1 parent 6737c8d commit 253f417
Show file tree
Hide file tree
Showing 21 changed files with 3,097 additions and 173 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
include .bingo/Variables.mk
include .busybox-versions

FILES_TO_FMT ?= $(shell find . -path ./vendor -prune -o -path ./internal/cortex -prune -o -name '*.go' -print)
FILES_TO_FMT ?= $(shell find . -path ./vendor -prune -o -path ./internal/cortex -prune -o -path '*.capnp.go' -prune -o -name '*.go' -print)
MD_FILES_TO_FORMAT = $(shell find docs -name "*.md") $(shell find examples -name "*.md") $(filter-out mixin/runbook.md, $(shell find mixin -name "*.md")) $(shell ls *.md)
FAST_MD_FILES_TO_FORMAT = $(shell git diff --name-only | grep "\.md")

Expand Down
45 changes: 34 additions & 11 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package main

import (
"context"
"fmt"
"os"
"path"
"strings"
"time"

"capnproto.org/go/capnp/v3"
"github.com/alecthomas/units"
extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/log"
Expand Down Expand Up @@ -44,6 +46,7 @@ import (
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/receive/writecapnp"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -259,6 +262,7 @@ func runReceive(
Limiter: limiter,

AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
UseCapNProtoReplication: conf.useCapNProtoReplication,
})

grpcProbe := prober.NewGRPC()
Expand Down Expand Up @@ -452,6 +456,19 @@ func runReceive(
}
}

{
handler := receive.NewCapNProtoHandler(logger, writer)
writeClient := capnp.Client(writecapnp.Writer_ServerToClient(handler))
server := receive.NewCapNProtoServer()
g.Add(func() error {
return server.ListenAndServe(conf.replicationAddr, writeClient)
}, func(err error) {
if err := server.Shutdown(); err != nil {
level.Warn(logger).Log("msg", "Cap'n Proto server did not shut down gracefully", "err", err.Error())
}
})
}

level.Info(logger).Log("msg", "starting receiver")
return nil
}
Expand Down Expand Up @@ -782,6 +799,7 @@ type receiveConfig struct {

grpcConfig grpcConfig

replicationAddr string
rwAddress string
rwServerCert string
rwServerKey string
Expand All @@ -803,17 +821,18 @@ type receiveConfig struct {
hashringsFileContent string
hashringsAlgorithm string

refreshInterval *model.Duration
endpoint string
tenantHeader string
tenantField string
tenantLabelName string
defaultTenantID string
replicaHeader string
replicationFactor uint64
forwardTimeout *model.Duration
maxBackoff *model.Duration
compression string
refreshInterval *model.Duration
endpoint string
tenantHeader string
tenantField string
tenantLabelName string
defaultTenantID string
replicaHeader string
replicationFactor uint64
forwardTimeout *model.Duration
maxBackoff *model.Duration
compression string
useCapNProtoReplication bool

tsdbMinBlockDuration *model.Duration
tsdbMaxBlockDuration *model.Duration
Expand Down Expand Up @@ -914,6 +933,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64Var(&rc.replicationFactor)

cmd.Flag("receive.capnproto-replication", "Use Cap'n Proto for replication requests.").Default("false").BoolVar(&rc.useCapNProtoReplication)

cmd.Flag("receive.capnproto-address", "Address for the Cap'n Proto server.").Default(fmt.Sprintf("0.0.0.0:%s", receive.DefaultCapNProtoPort)).StringVar(&rc.replicationAddr)

rc.forwardTimeout = extkingpin.ModelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

rc.maxBackoff = extkingpin.ModelDuration(cmd.Flag("receive-forward-max-backoff", "Maximum backoff for each forward fan-out request").Default("5s").Hidden())
Expand Down
19 changes: 19 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ If you are using the `hashmod` algorithm and wish to migrate to `ketama`, the si

This algorithm uses a `hashmod` function over all labels to decide which receiver is responsible for a given timeseries. This is the default algorithm due to historical reasons. However, its usage for new Receive installations is discouraged since adding new Receiver nodes leads to series churn and memory usage spikes.

### Replication protocols

By default, Receivers will replicate data using Protobuf over gRPC. Deserializing protobuf-encoded messages can often be resource intensive and cause a lot of GC pressure.
It is possible to use [Cap'N Proto](https://capnproto.org/) as the replication encoding and RPC framework.

In order to enable this mode, you can enable the `receive.capnproto-replication` flag on the receiver. Thanos will try to infer the Cap'N Proto address of each peer in
the hashring using the existing gRPC address. You can also explicitly set the Cap'N Proto as follows:
```json
[
{
"endpoints": [
{"address": "node-1:10901", "capnproto_address": "node-1:19391"},
{"address": "node-2:10901", "capnproto_address": "node-2:19391"},
{"address": "node-3:10901", "capnproto_address": "node-3:19391"}
]
}
]
```

### Hashring management and autoscaling in Kubernetes

The [Thanos Receive Controller](https://github.com/observatorium/thanos-receive-controller) project aims to automate hashring management when running Thanos in Kubernetes. In combination with the Ketama hashring algorithm, this controller can also be used to keep hashrings up to date when Receivers are scaled automatically using an HPA or [Keda](https://keda.sh/).
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ require (
)

require (
capnproto.org/go/capnp/v3 v3.0.1-alpha.2
github.com/cortexproject/promqlsmith v0.0.0-20240506042652-6cfdd9739a5e
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/hashicorp/golang-lru/v2 v2.0.7
Expand All @@ -131,6 +132,8 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/cilium/ebpf v0.11.0 // indirect
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b // indirect
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 // indirect
github.com/containerd/cgroups/v3 v3.0.3 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/go-licenser v0.3.1 // indirect
Expand Down
14 changes: 12 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
capnproto.org/go/capnp/v3 v3.0.1-alpha.2 h1:W/cf+XEArUSwcBBE/9wS2NpWDkM5NLQOjmzEiHZpYi0=
capnproto.org/go/capnp/v3 v3.0.1-alpha.2/go.mod h1:2vT5D2dtG8sJGEoEKU17e+j7shdaYp1Myl8X03B3hmc=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU=
Expand Down Expand Up @@ -1491,9 +1493,11 @@ github.com/cncf/xds/go v0.0.0-20230428030218-4003588d1b74/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20231109132714-523115ebc101/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20231128003011-0fa0005c9caa/go.mod h1:x/1Gn8zydmfq8dk6e9PdstVsDgu9RuyIIJqAaF//0IM=
github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50 h1:DBmgJDC9dTfkVyGgipamEh2BpGYxScCH1TOF1LL1cXc=
github.com/cncf/xds/go v0.0.0-20240318125728-8a4994d93e50/go.mod h1:5e1+Vvlzido69INQaVO6d87Qn543Xr6nooe9Kz7oBFM=
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b h1:ga8SEFjZ60pxLcmhnThWgvH2wg8376yUJmPhEH4H3kw=
github.com/cncf/xds/go v0.0.0-20240423153145-555b57ec207b/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU=
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0=
github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0=
github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0=
github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand Down Expand Up @@ -2080,6 +2084,8 @@ github.com/ovh/go-ovh v1.5.1 h1:P8O+7H+NQuFK9P/j4sFW5C0fvSS2DnHYGPwdVCp45wI=
github.com/ovh/go-ovh v1.5.1/go.mod h1:cTVDnl94z4tl8pP1uZ/8jlVxntjSIf09bNcQ5TJSC7c=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/philhofer/fwd v1.1.2 h1:bnDivRJ1EWPjUIRXV5KfORO897HTbpFAQddBdE8t7Gw=
github.com/philhofer/fwd v1.1.2/go.mod h1:qkPdfjR2SIEbspLqpe1tO4n5yICnr2DY7mqEx2tUTP0=
github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY=
github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI=
Expand Down Expand Up @@ -2233,6 +2239,10 @@ github.com/thanos-io/promql-engine v0.0.0-20240718195911-cdbd6dfed36b h1:V06gjM1
github.com/thanos-io/promql-engine v0.0.0-20240718195911-cdbd6dfed36b/go.mod h1:Gtv7CJIxGyiGsT+bNDg4nOAsL/bVKLlpfOZUSLSyYfY=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU=
github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k=
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
Expand Down
112 changes: 112 additions & 0 deletions pkg/receive/capnp_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package receive

import (
"context"
"net"
"sync"

"capnproto.org/go/capnp/v3"
"capnproto.org/go/capnp/v3/rpc"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/thanos-io/thanos/pkg/receive/writecapnp"
)

type CapNProtoServer struct {
mu sync.Mutex
listener net.Listener
}

func NewCapNProtoServer() *CapNProtoServer {
return &CapNProtoServer{}
}

func (c *CapNProtoServer) ListenAndServe(addr string, client capnp.Client) error {
if err := c.connect(addr); err != nil {
return err
}
for {
conn, err := c.listener.Accept()
if err != nil {
return err
}
go func() {
defer conn.Close()
rpcConn := rpc.NewConn(rpc.NewPackedStreamTransport(conn), &rpc.Options{
// The BootstrapClient is the RPC interface that will be made available
// to the remote endpoint by default.
BootstrapClient: client.AddRef(),
})
<-rpcConn.Done()
}()
}
}

func (c *CapNProtoServer) connect(addr string) error {
c.mu.Lock()
defer c.mu.Unlock()

var err error
c.listener, err = net.Listen("tcp", addr)
return err
}

func (c *CapNProtoServer) Shutdown() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.listener != nil {
return c.listener.Close()
}
return nil
}

type CapNProtoHandler struct {
writer *Writer
logger log.Logger
}

func NewCapNProtoHandler(logger log.Logger, writer *Writer) *CapNProtoHandler {
return &CapNProtoHandler{logger: logger, writer: writer}
}

func (c CapNProtoHandler) Write(ctx context.Context, call writecapnp.Writer_write) error {
wr, err := call.Args().Wr()
if err != nil {
return err
}
t, err := wr.Tenant()
if err != nil {
return err
}

var errs writeErrors
errs.Add(c.writer.Write(ctx, t, writecapnp.NewWriteableRequest(wr), false))
if err := errs.ErrOrNil(); err != nil {
level.Debug(c.logger).Log("msg", "failed to handle request", "err", err)
result, allocErr := call.AllocResults()
if allocErr != nil {
return allocErr
}

switch errors.Cause(err) {
case nil:
return nil
case errNotReady:
result.SetError(writecapnp.WriteError_unavailable)
case errUnavailable:
result.SetError(writecapnp.WriteError_unavailable)
case errConflict:
result.SetError(writecapnp.WriteError_alreadyExists)
case errBadReplica:
result.SetError(writecapnp.WriteError_invalidArgument)
default:
result.SetError(writecapnp.WriteError_internal)
}
}
return nil
}
48 changes: 41 additions & 7 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"crypto/md5"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"

"github.com/fsnotify/fsnotify"
Expand All @@ -36,14 +38,44 @@ const (
RouterOnly ReceiverMode = "RouterOnly"
IngestorOnly ReceiverMode = "IngestorOnly"
RouterIngestor ReceiverMode = "RouterIngestor"

DefaultCapNProtoPort string = "19391"
)

type Endpoint struct {
Address string `json:"address"`
AZ string `json:"az"`
Address string `json:"address"`
CapNProtoAddress string `json:"capnproto_address"`
AZ string `json:"az"`
}

func (e *Endpoint) String() string {
return fmt.Sprintf("addr: %s, capnp_addr: %s, az: %s", e.Address, e.CapNProtoAddress, e.AZ)
}

func (e *Endpoint) HasAddress(addr string) bool {
return e.Address == addr || e.CapNProtoAddress == addr
}

func (e *Endpoint) UnmarshalJSON(data []byte) error {
if err := e.unmarshal(data); err != nil {
return err
}
if e.Address == "" {
return errors.New("endpoint address must be set")
}

// If the Cap'n proto address is not set, initialize it
// to the existing address using the default cap'n proto server port.
if e.CapNProtoAddress != "" {
return nil
}
if parts := strings.SplitN(e.Address, ":", 2); len(parts) <= 2 {
e.CapNProtoAddress = parts[0] + ":" + DefaultCapNProtoPort
}
return nil
}

func (e *Endpoint) unmarshal(data []byte) error {
// First try to unmarshal as a string.
err := json.Unmarshal(data, &e.Address)
if err == nil {
Expand All @@ -53,12 +85,14 @@ func (e *Endpoint) UnmarshalJSON(data []byte) error {
// If that fails, try to unmarshal as an endpoint object.
type endpointAlias Endpoint
var configEndpoint endpointAlias
err = json.Unmarshal(data, &configEndpoint)
if err == nil {
e.Address = configEndpoint.Address
e.AZ = configEndpoint.AZ
if err := json.Unmarshal(data, &configEndpoint); err != nil {
return err
}
return err

e.Address = configEndpoint.Address
e.AZ = configEndpoint.AZ
e.CapNProtoAddress = configEndpoint.CapNProtoAddress
return nil
}

// HashringConfig represents the configuration for a hashring
Expand Down
Loading

0 comments on commit 253f417

Please sign in to comment.