-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement capnproto replication (#7659)
* Implement capnproto replication Our profiles from production show that a lot of CPU and memory in receivers is used for unmarshaling protobuf messages. Although it is not possible to change the remote-write format, we have the freedom to change the protocol used for replicating timeseries data. This commit introduces a new feature in receivers where replication can be done using Cap'n Proto instead of gRPC + Protobuf. The advantage of the former protocol is that deserialization is far cheaper and fields can be accessed directly from the received message (byte slice) without allocating intermediate objects. There is an additional cost for serialization because we have to convert from Protobuf to the Cap'n proto format, but in our setup this still results in a net reduction in resource usage. Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Pass logger Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Update capnp Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Modify flag Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Lint Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Fix spellcheck Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Use previous version Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Update docker base Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Bump go Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Update docs/components/receive.md Co-authored-by: Pedro Tanaka <pedro.tanaka@shopify.com> Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Validate labels Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * e2e: add receive test with capnp replication Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com> * receive: make copy only when necessary Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com> * Fix failing test Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Add CHANGELOG entry Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Add capnproto Make target Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Replace panics with errors Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Fix benchmark Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> * Fix CHANGELOG Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> --------- Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com> Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com> Co-authored-by: Pedro Tanaka <pedro.tanaka@shopify.com> Co-authored-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
- Loading branch information
1 parent
274f95e
commit 65b664c
Showing
33 changed files
with
4,163 additions
and
349 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
module _ // Auto generated by https://github.com/bwplotka/bingo. DO NOT EDIT | ||
|
||
go 1.23.1 | ||
|
||
require capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af // capnpc-go |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af h1:A5wxH0ZidOtYYUGjhtBaRuB87M73bGfc06uWB8sHpg0= | ||
capnproto.org/go/capnp/v3 v3.0.1-alpha.2.0.20240830165715-46ccd63a72af/go.mod h1:2vT5D2dtG8sJGEoEKU17e+j7shdaYp1Myl8X03B3hmc= | ||
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381 h1:d5EKgQfRQvO97jnISfR89AiCCCJMwMFoSxUiU0OGCRU= | ||
github.com/colega/zeropool v0.0.0-20230505084239-6fb4a4f75381/go.mod h1:OU76gHeRo8xrzGJU3F3I1CqX1ekM8dfJw0+wPeMwnp0= | ||
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= | ||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package receive | ||
|
||
import ( | ||
"context" | ||
"net" | ||
|
||
"capnproto.org/go/capnp/v3" | ||
"capnproto.org/go/capnp/v3/rpc" | ||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/pkg/errors" | ||
|
||
"github.com/thanos-io/thanos/pkg/receive/writecapnp" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
) | ||
|
||
type CapNProtoServer struct { | ||
listener net.Listener | ||
server writecapnp.Writer | ||
logger log.Logger | ||
} | ||
|
||
func NewCapNProtoServer(listener net.Listener, handler *CapNProtoHandler, logger log.Logger) *CapNProtoServer { | ||
return &CapNProtoServer{ | ||
listener: listener, | ||
server: writecapnp.Writer_ServerToClient(handler), | ||
logger: logger, | ||
} | ||
} | ||
|
||
func (c *CapNProtoServer) ListenAndServe() error { | ||
for { | ||
conn, err := c.listener.Accept() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
go func() { | ||
defer runutil.CloseWithLogOnErr(c.logger, conn, "receive capnp conn") | ||
rpcConn := rpc.NewConn(rpc.NewPackedStreamTransport(conn), &rpc.Options{ | ||
// The BootstrapClient is the RPC interface that will be made available | ||
// to the remote endpoint by default. | ||
BootstrapClient: capnp.Client(c.server).AddRef(), | ||
}) | ||
<-rpcConn.Done() | ||
}() | ||
} | ||
} | ||
|
||
func (c *CapNProtoServer) Shutdown() { | ||
c.server.Release() | ||
} | ||
|
||
type CapNProtoHandler struct { | ||
writer *CapNProtoWriter | ||
logger log.Logger | ||
} | ||
|
||
func NewCapNProtoHandler(logger log.Logger, writer *CapNProtoWriter) *CapNProtoHandler { | ||
return &CapNProtoHandler{logger: logger, writer: writer} | ||
} | ||
|
||
func (c CapNProtoHandler) Write(ctx context.Context, call writecapnp.Writer_write) error { | ||
call.Go() | ||
wr, err := call.Args().Wr() | ||
if err != nil { | ||
return err | ||
} | ||
t, err := wr.Tenant() | ||
if err != nil { | ||
return err | ||
} | ||
req, err := writecapnp.NewRequest(wr) | ||
if err != nil { | ||
return err | ||
} | ||
defer req.Close() | ||
|
||
var errs writeErrors | ||
errs.Add(c.writer.Write(ctx, t, req)) | ||
if err := errs.ErrOrNil(); err != nil { | ||
level.Debug(c.logger).Log("msg", "failed to handle request", "err", err) | ||
result, allocErr := call.AllocResults() | ||
if allocErr != nil { | ||
return allocErr | ||
} | ||
|
||
switch errors.Cause(err) { | ||
case nil: | ||
return nil | ||
case errNotReady: | ||
result.SetError(writecapnp.WriteError_unavailable) | ||
case errUnavailable: | ||
result.SetError(writecapnp.WriteError_unavailable) | ||
case errConflict: | ||
result.SetError(writecapnp.WriteError_alreadyExists) | ||
case errBadReplica: | ||
result.SetError(writecapnp.WriteError_invalidArgument) | ||
default: | ||
result.SetError(writecapnp.WriteError_internal) | ||
} | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.