diff --git a/deploy/lib/parca/parca.libsonnet b/deploy/lib/parca/parca.libsonnet index 1e75c5e1ba1..db8dec29ee5 100644 --- a/deploy/lib/parca/parca.libsonnet +++ b/deploy/lib/parca/parca.libsonnet @@ -218,7 +218,8 @@ function(params) { else ['--cors-allowed-origins=' + prc.config.corsAllowedOrigins]) + (if prc.config.storageRetentionTime == '' then [] else ['--storage-tsdb-retention-time=' + prc.config.storageRetentionTime]) + - ['--debug-infod-upstream-servers=' + std.join(',', prc.config.debugInfodUpstreamServers)] + + (if std.length(prc.config.debugInfodUpstreamServers) <= 0 then [] + else ['--debug-infod-upstream-servers=' + std.join(',', prc.config.debugInfodUpstreamServers)]) + (if prc.config.debugInfodHTTPRequestTimeout == '' then [] else ['--debug-infod-http-request-timeout=' + prc.config.debugInfodHTTPRequestTimeout]), ports: [ diff --git a/gen/proto/go/parca/debuginfo/v1alpha1/debuginfo.pb.go b/gen/proto/go/parca/debuginfo/v1alpha1/debuginfo.pb.go index dba098dec2a..7d6e53994b6 100644 --- a/gen/proto/go/parca/debuginfo/v1alpha1/debuginfo.pb.go +++ b/gen/proto/go/parca/debuginfo/v1alpha1/debuginfo.pb.go @@ -220,7 +220,7 @@ type UploadInfo struct { // build_id is a unique identifier for the debug data BuildId string `protobuf:"bytes,1,opt,name=build_id,json=buildId,proto3" json:"build_id,omitempty"` - // hash is the hash of the debug information file + // hash is the hash of the source file that debug information extracted from Hash string `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"` } diff --git a/gen/proto/swagger/parca/debuginfo/v1alpha1/debuginfo.swagger.json b/gen/proto/swagger/parca/debuginfo/v1alpha1/debuginfo.swagger.json index 6ec41b83c3f..8ecf481d5b2 100644 --- a/gen/proto/swagger/parca/debuginfo/v1alpha1/debuginfo.swagger.json +++ b/gen/proto/swagger/parca/debuginfo/v1alpha1/debuginfo.swagger.json @@ -63,7 +63,7 @@ }, "hash": { "type": "string", - "title": "hash is the hash of the debug information file" + "title": "hash is the hash of the source file that debug information extracted from" } }, "title": "UploadInfo contains the build_id and other metadata for the debug data" diff --git a/go.mod b/go.mod index 8e5271ec441..0bd4ef3dd7d 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3 github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2 github.com/improbable-eng/grpc-web v0.15.0 + github.com/nanmu42/limitio v1.0.0 github.com/oklog/run v1.1.0 github.com/polarsignals/frostdb v0.0.0-20220627125235-571b4dc57775 github.com/prometheus/client_golang v1.12.2 diff --git a/go.sum b/go.sum index 6fb2d1a5cbe..dfe55b3c064 100644 --- a/go.sum +++ b/go.sum @@ -780,6 +780,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/grpc-proxy v0.0.0-20181017164139-0f1106ef9c76/go.mod h1:x5OoJHDHqxHS801UIuhqGl6QdSAEJvtausosHSdazIo= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= +github.com/nanmu42/limitio v1.0.0 h1:dpopBYPwUyLOPv+vsGja0iax+dG0SP9paTEmz+Sy7KU= +github.com/nanmu42/limitio v1.0.0/go.mod h1:8H40zQ7pqxzbwZ9jxsK2hDoE06TH5ziybtApt1io8So= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= diff --git a/pkg/debuginfo/debuginfod.go b/pkg/debuginfo/debuginfod.go index a257e5b68f8..852e9255275 100644 --- a/pkg/debuginfo/debuginfod.go +++ b/pkg/debuginfo/debuginfod.go @@ -119,6 +119,7 @@ func (c *DebugInfodClientObjectStorageCache) GetDebugInfo(ctx context.Context, b defer w.Close() defer debugInfo.Close() + // TODO(kakkoyun): Use store.upload() to upload the debug info to object storage. if err := c.bucket.Upload(ctx, objectPath(buildID), r); err != nil { level.Error(logger).Log("msg", "failed to upload downloaded debuginfod file", "err", err) } @@ -142,15 +143,15 @@ func (c *HTTPDebugInfodClient) GetDebugInfo(ctx context.Context, buildID string) logger := log.With(c.logger, "buildid", buildID) // e.g: - //"https://debuginfod.elfutils.org/" - //"https://debuginfod.systemtap.org/" - //"https://debuginfod.opensuse.org/" - //"https://debuginfod.s.voidlinux.org/" - //"https://debuginfod.debian.net/" - //"https://debuginfod.fedoraproject.org/" - //"https://debuginfod.altlinux.org/" - //"https://debuginfod.archlinux.org/" - //"https://debuginfod.centos.org/" + // "https://debuginfod.elfutils.org/" + // "https://debuginfod.systemtap.org/" + // "https://debuginfod.opensuse.org/" + // "https://debuginfod.s.voidlinux.org/" + // "https://debuginfod.debian.net/" + // "https://debuginfod.fedoraproject.org/" + // "https://debuginfod.altlinux.org/" + // "https://debuginfod.archlinux.org/" + // "https://debuginfod.centos.org/" for _, u := range c.UpstreamServers { serverURL := *u rc, err := func(serverURL url.URL) (io.ReadCloser, error) { diff --git a/pkg/debuginfo/metadata.go b/pkg/debuginfo/metadata.go index 0783956acca..55d4138b74a 100644 --- a/pkg/debuginfo/metadata.go +++ b/pkg/debuginfo/metadata.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "path" "time" @@ -27,36 +28,37 @@ import ( ) var ( - ErrMetadataShouldExist = errors.New("debug info metadata should exist") - ErrMetadataExpectedStateUploading = errors.New("debug info metadata state should be uploading") - ErrMetadataUnknownState = errors.New("debug info metadata state is unknown") + ErrMetadataShouldExist = errors.New("debug info metadata should exist") + ErrMetadataUnexpectedState = errors.New("debug info metadata state is unexpected") + // There's no debug info metadata. This could mean that an older version + // uploaded the debug info files, but there's no record of the metadata, yet. + ErrMetadataNotFound = errors.New("debug info metadata not found") ) type metadataState int64 const ( metadataStateUnknown metadataState = iota - // There's no debug info metadata. This could mean that an older version - // uploaded the debug info files, but there's no record of the metadata, yet. - metadataStateEmpty // The debug info file is being uploaded. metadataStateUploading // The debug info file is fully uploaded. metadataStateUploaded + // The debug info file is corrupted. + metadataStateCorrupted ) var mdStateStr = map[metadataState]string{ metadataStateUnknown: "METADATA_STATE_UNKNOWN", - metadataStateEmpty: "METADATA_STATE_EMPTY", metadataStateUploading: "METADATA_STATE_UPLOADING", metadataStateUploaded: "METADATA_STATE_UPLOADED", + metadataStateCorrupted: "METADATA_STATE_CORRUPTED", } var strMdState = map[string]metadataState{ "METADATA_STATE_UNKNOWN": metadataStateUnknown, - "METADATA_STATE_EMPTY": metadataStateEmpty, "METADATA_STATE_UPLOADING": metadataStateUploading, "METADATA_STATE_UPLOADED": metadataStateUploaded, + "METADATA_STATE_CORRUPTED": metadataStateCorrupted, } func (m metadataState) String() string { @@ -96,98 +98,120 @@ func newMetadataManager(logger log.Logger, bucket objstore.Bucket) *metadataMana type metadata struct { State metadataState `json:"state"` - StartedUploadAt int64 `json:"started_upload_at"` - FinishedUploadAt int64 `json:"finished_upload_at"` + BuildID string `json:"build_id"` + Hash string `json:"hash"` + UploadStartedAt int64 `json:"upload_started_at"` + UploadFinishedAt int64 `json:"upload_finished_at"` } -func (m *metadataManager) update(ctx context.Context, buildID string, state metadataState) error { - level.Debug(m.logger).Log("msg", "attempting state update to", "state", state) - - switch state { - case metadataStateUploading: - _, err := m.bucket.Get(ctx, metadataObjectPath(buildID)) - // The metadata file should not exist yet. Not erroring here because there's - // room for a race condition. - if err == nil { - level.Info(m.logger).Log("msg", "there should not be a metadata file") - return nil - } +func (m *metadataManager) markAsCorrupted(ctx context.Context, buildID string) error { + if err := m.write(ctx, buildID, &metadata{ + State: metadataStateCorrupted, + }); err != nil { + return fmt.Errorf("failed to write metadata: %w", err) + } + level.Debug(m.logger).Log("msg", "marked as corrupted", "buildid", buildID) + return nil +} - if !m.bucket.IsObjNotFoundErr(err) { - level.Error(m.logger).Log("msg", "unexpected error", "err", err) - return err - } +func (m *metadataManager) markAsUploading(ctx context.Context, buildID string) error { + _, err := m.bucket.Get(ctx, metadataObjectPath(buildID)) + // The metadata file should not exist yet. Not erroring here because there's + // room for a race condition. + if err == nil { + level.Info(m.logger).Log("msg", "there should not be a metadata file") + return nil + } - // Let's write the metadata. - metadataBytes, _ := json.MarshalIndent(&metadata{ - State: metadataStateUploading, - StartedUploadAt: time.Now().Unix(), - }, "", "\t") - r := bytes.NewReader(metadataBytes) - if err := m.bucket.Upload(ctx, metadataObjectPath(buildID), r); err != nil { - level.Error(m.logger).Log("msg", "failed to create metadata file", "err", err) - return err - } + if !m.bucket.IsObjNotFoundErr(err) { + level.Error(m.logger).Log("msg", "unexpected error", "err", err) + return err + } - case metadataStateUploaded: - r, err := m.bucket.Get(ctx, metadataObjectPath(buildID)) - if err != nil { - level.Error(m.logger).Log("msg", "expected metadata file", "err", err) - return ErrMetadataShouldExist - } - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(r) - if err != nil { - level.Error(m.logger).Log("msg", "ReadFrom failed", "err", err) - return err - } + if err := m.write(ctx, buildID, &metadata{ + State: metadataStateUploading, + BuildID: buildID, + UploadStartedAt: time.Now().Unix(), + }); err != nil { + return fmt.Errorf("failed to write metadata: %w", err) + } - metaData := &metadata{} + level.Debug(m.logger).Log("msg", "marked as uploading", "buildid", buildID) + return nil +} - if err := json.Unmarshal(buf.Bytes(), metaData); err != nil { - level.Error(m.logger).Log("msg", "parsing JSON metadata failed", "err", err) - return err - } +func (m *metadataManager) markAsUploaded(ctx context.Context, buildID, hash string) error { + r, err := m.bucket.Get(ctx, metadataObjectPath(buildID)) + if err != nil { + level.Error(m.logger).Log("msg", "expected metadata file", "err", err) + return ErrMetadataShouldExist + } + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(r) + if err != nil { + return err + } - // There's a small window where a race could happen. - if metaData.State == metadataStateUploaded { - return nil - } + metaData := &metadata{} + if err := json.Unmarshal(buf.Bytes(), metaData); err != nil { + return err + } - if metaData.State != metadataStateUploading { - return ErrMetadataExpectedStateUploading - } + // There's a small window where a race could happen. + if metaData.State == metadataStateUploaded { + return nil + } - metaData.State = metadataStateUploaded - metaData.FinishedUploadAt = time.Now().Unix() + if metaData.State == metadataStateUploading && metaData.BuildID != buildID { + return errors.New("build ids do not match") + } - metadataBytes, _ := json.MarshalIndent(&metaData, "", "\t") - newData := bytes.NewReader(metadataBytes) + metaData.State = metadataStateUploaded + metaData.BuildID = buildID + metaData.Hash = hash + metaData.UploadFinishedAt = time.Now().Unix() - if err := m.bucket.Upload(ctx, metadataObjectPath(buildID), newData); err != nil { - return err - } + metadataBytes, _ := json.MarshalIndent(&metaData, "", "\t") + newData := bytes.NewReader(metadataBytes) + + if err := m.bucket.Upload(ctx, metadataObjectPath(buildID), newData); err != nil { + return err } + + level.Debug(m.logger).Log("msg", "marked as uploaded", "buildid", buildID) return nil } -func (m *metadataManager) fetch(ctx context.Context, buildID string) (metadataState, error) { +func (m *metadataManager) fetch(ctx context.Context, buildID string) (*metadata, error) { r, err := m.bucket.Get(ctx, metadataObjectPath(buildID)) if err != nil { - return metadataStateEmpty, nil + if m.bucket.IsObjNotFoundErr(err) { + return nil, ErrMetadataNotFound + } + return nil, err } buf := new(bytes.Buffer) _, err = buf.ReadFrom(r) if err != nil { - return metadataStateUnknown, err + return nil, err } metaData := &metadata{} if err := json.Unmarshal(buf.Bytes(), metaData); err != nil { - return metadataStateUnknown, err + return nil, err } - return metaData.State, nil + return metaData, nil +} + +func (m *metadataManager) write(ctx context.Context, buildID string, md *metadata) error { + metadataBytes, _ := json.MarshalIndent(md, "", "\t") + r := bytes.NewReader(metadataBytes) + if err := m.bucket.Upload(ctx, metadataObjectPath(buildID), r); err != nil { + level.Error(m.logger).Log("msg", "failed to create metadata file", "err", err) + return err + } + return nil } func metadataObjectPath(buildID string) string { diff --git a/pkg/debuginfo/metadata_test.go b/pkg/debuginfo/metadata_test.go index 803e68f160c..fb866cf5b82 100644 --- a/pkg/debuginfo/metadata_test.go +++ b/pkg/debuginfo/metadata_test.go @@ -66,17 +66,16 @@ func TestMetadata(t *testing.T) { require.NoError(t, err) // Test that the initial state should be empty. - state, err := store.metadataManager.fetch(context.Background(), "fake-build-id") - require.NoError(t, err) - require.Equal(t, metadataStateEmpty, state) + _, err = store.metadataManager.fetch(context.Background(), "fake-build-id") + require.ErrorIs(t, err, ErrMetadataNotFound) // Updating the state should be written to blob storage. - err = store.metadataManager.update(context.Background(), "fake-build-id", metadataStateUploading) + err = store.metadataManager.markAsUploading(context.Background(), "fake-build-id") require.NoError(t, err) - state, err = store.metadataManager.fetch(context.Background(), "fake-build-id") + md, err := store.metadataManager.fetch(context.Background(), "fake-build-id") require.NoError(t, err) - require.Equal(t, metadataStateUploading, state) + require.Equal(t, metadataStateUploading, md.State) } func TestMetadata_MarshalJSON(t *testing.T) { @@ -86,20 +85,20 @@ func TestMetadata_MarshalJSON(t *testing.T) { wantErr bool }{ { - m: metadata{State: metadataStateUnknown}, - want: `{"state":"METADATA_STATE_UNKNOWN","started_upload_at":0,"finished_upload_at":0}`, + m: metadata{State: metadataStateUnknown, BuildID: "build_id", Hash: "hash"}, + want: `{"state":"METADATA_STATE_UNKNOWN","build_id":"build_id","hash":"hash","upload_started_at":0,"upload_finished_at":0}`, }, { - m: metadata{State: metadataStateEmpty}, - want: `{"state":"METADATA_STATE_EMPTY","started_upload_at":0,"finished_upload_at":0}`, + m: metadata{State: metadataStateUploading, BuildID: "build_id", Hash: "hash"}, + want: `{"state":"METADATA_STATE_UPLOADING","build_id":"build_id","hash":"hash","upload_started_at":0,"upload_finished_at":0}`, }, { - m: metadata{State: metadataStateUploading}, - want: `{"state":"METADATA_STATE_UPLOADING","started_upload_at":0,"finished_upload_at":0}`, + m: metadata{State: metadataStateUploaded, BuildID: "build_id", Hash: "hash"}, + want: `{"state":"METADATA_STATE_UPLOADED","build_id":"build_id","hash":"hash","upload_started_at":0,"upload_finished_at":0}`, }, { - m: metadata{State: metadataStateUploaded}, - want: `{"state":"METADATA_STATE_UPLOADED","started_upload_at":0,"finished_upload_at":0}`, + m: metadata{State: metadataStateCorrupted, BuildID: "build_id", Hash: "hash"}, + want: `{"state":"METADATA_STATE_CORRUPTED","build_id":"build_id","hash":"hash","upload_started_at":0,"upload_finished_at":0}`, }, } for _, tt := range tests { @@ -125,20 +124,20 @@ func TestMetadata_UnmarshalJSON(t *testing.T) { wantErr bool }{ { - b: []byte(`{"state":"METADATA_STATE_UNKNOWN","started_upload_at":0,"finished_upload_at":0}`), - want: metadata{State: metadataStateUnknown}, + b: []byte(`{"state":"METADATA_STATE_UNKNOWN","build_id":"build_id","hash":"hash","upload_started_at":0,"upload_finished_at":0}`), + want: metadata{State: metadataStateUnknown, BuildID: "build_id", Hash: "hash"}, }, { - b: []byte(`{"state":"METADATA_STATE_EMPTY","started_upload_at":0,"finished_upload_at":0}`), - want: metadata{State: metadataStateEmpty}, + b: []byte(`{"state":"METADATA_STATE_UPLOADING","build_id":"build_id","hash":"hash","upload_started_at":0,"upload_finished_at":0}`), + want: metadata{State: metadataStateUploading, BuildID: "build_id", Hash: "hash"}, }, { - b: []byte(`{"state":"METADATA_STATE_UPLOADING","started_upload_at":0,"finished_upload_at":0}`), - want: metadata{State: metadataStateUploading}, + b: []byte(`{"state":"METADATA_STATE_UPLOADED","build_id":"build_id","hash":"hash","upload_started_at":0,"upload_finished_at":0}`), + want: metadata{State: metadataStateUploaded, BuildID: "build_id", Hash: "hash"}, }, { - b: []byte(`{"state":"METADATA_STATE_UPLOADED","started_upload_at":0,"finished_upload_at":0}`), - want: metadata{State: metadataStateUploaded}, + b: []byte(`{"state":"METADATA_STATE_CORRUPTED","build_id":"build_id","hash":"hash","upload_started_at":0,"upload_finished_at":0}`), + want: metadata{State: metadataStateCorrupted, BuildID: "build_id", Hash: "hash"}, }, } for _, tt := range tests { diff --git a/pkg/debuginfo/store.go b/pkg/debuginfo/store.go index a24865f85a0..1efecd09b52 100644 --- a/pkg/debuginfo/store.go +++ b/pkg/debuginfo/store.go @@ -14,8 +14,8 @@ package debuginfo import ( + "bytes" "context" - "debug/elf" "encoding/hex" "errors" "fmt" @@ -24,9 +24,11 @@ import ( "os" "path" "strings" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/nanmu42/limitio" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/client" "google.golang.org/grpc/codes" @@ -35,7 +37,6 @@ import ( debuginfopb "github.com/parca-dev/parca/gen/proto/go/parca/debuginfo/v1alpha1" pb "github.com/parca-dev/parca/gen/proto/go/parca/metastore/v1alpha1" - "github.com/parca-dev/parca/pkg/hash" "github.com/parca-dev/parca/pkg/profile" "github.com/parca-dev/parca/pkg/symbol" "github.com/parca-dev/parca/pkg/symbol/elfutils" @@ -143,7 +144,7 @@ func newCache(cacheCfg []byte) (*FilesystemCacheConfig, error) { func (s *Store) Exists(ctx context.Context, req *debuginfopb.ExistsRequest) (*debuginfopb.ExistsResponse, error) { buildID := req.BuildId - if err := validateID(buildID); err != nil { + if err := validateInput(buildID); err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -152,28 +153,29 @@ func (s *Store) Exists(ctx context.Context, req *debuginfopb.ExistsRequest) (*de return nil, err } - if found && req.Hash != "" { - dbgFile, err := s.fetchObjectFile(ctx, buildID) + if found { + metadataFile, err := s.metadataManager.fetch(ctx, buildID) if err != nil { + if errors.Is(err, ErrMetadataNotFound) { + return &debuginfopb.ExistsResponse{Exists: false}, nil + } return nil, status.Error(codes.Internal, err.Error()) } - h, err := hash.File(dbgFile) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) + // metadata.Hash should nt be empty, but let's have the check just in case. + if metadataFile.Hash != "" && metadataFile.Hash == req.Hash { + return &debuginfopb.ExistsResponse{Exists: true}, nil } - // It is not an exact version of what we have so, let the client try to upload it. - if h != req.Hash { - return &debuginfopb.ExistsResponse{ - Exists: false, - }, nil + var exists bool + // If it is not an exact version of the source object file what we have so, let the client try to upload it. + if metadataFile.State == metadataStateUploading { + exists = !isStale(metadataFile) } + return &debuginfopb.ExistsResponse{Exists: exists}, nil } - return &debuginfopb.ExistsResponse{ - Exists: found, - }, nil + return &debuginfopb.ExistsResponse{Exists: false}, nil } func (s *Store) Upload(stream debuginfopb.DebugInfoService_UploadServer) error { @@ -184,28 +186,57 @@ func (s *Store) Upload(stream debuginfopb.DebugInfoService_UploadServer) error { return status.Errorf(codes.Unknown, msg) } - buildID := req.GetInfo().BuildId - if err = validateID(buildID); err != nil { + var ( + buildID = req.GetInfo().BuildId + hash = req.GetInfo().Hash + r = &UploadReader{stream: stream} + ) + if err := s.upload(stream.Context(), buildID, hash, r); err != nil { + return fmt.Errorf("failed to upload debug information file: %w", err) + } + + level.Debug(s.logger).Log("msg", "debug info uploaded", "buildid", buildID) + return stream.SendAndClose(&debuginfopb.UploadResponse{ + BuildId: buildID, + Size: r.size, + }) +} + +func (s *Store) upload(ctx context.Context, buildID, hash string, r io.Reader) error { + if err := validateInput(buildID); err != nil { + err = fmt.Errorf("invalid build ID: %w", err) + return status.Error(codes.InvalidArgument, err.Error()) + } + + if err := validateInput(hash); err != nil { + err = fmt.Errorf("invalid hash: %w", err) return status.Error(codes.InvalidArgument, err.Error()) } level.Debug(s.logger).Log("msg", "trying to upload debug info", "buildid", buildID) - ctx := stream.Context() - if result, err := s.metadataManager.fetch(ctx, buildID); err == nil { - level.Debug(s.logger).Log("msg", "fetching metadata state", "result", result) + metadataFile, err := s.metadataManager.fetch(ctx, buildID) + if err == nil { + level.Debug(s.logger).Log("msg", "fetching metadata state", "result", metadataFile) - switch result { - case metadataStateEmpty: - // Not created yet. + switch metadataFile.State { + case metadataStateCorrupted: + // Corrupted. Re-upload. case metadataStateUploaded: // The debug info was fully uploaded. - fallthrough + return status.Error(codes.AlreadyExists, "debuginfo already exists") case metadataStateUploading: - return status.Error(codes.AlreadyExists, "debuginfo already exists, being uploaded right now") + if !isStale(metadataFile) { + return status.Error(codes.AlreadyExists, "debuginfo already exists, being uploaded right now") + } + // The debug info upload operation most likely failed. + default: + return status.Error(codes.Internal, "unknown metadata state") } } else { - level.Error(s.logger).Log("msg", "failed to fetch metadata state", "err", err) + if !errors.Is(err, ErrMetadataNotFound) { + level.Error(s.logger).Log("msg", "failed to fetch metadata state", "err", err) + } } found, err := s.find(ctx, buildID) @@ -214,72 +245,93 @@ func (s *Store) Upload(stream debuginfopb.DebugInfoService_UploadServer) error { } if found { + if hash != "" && metadataFile != nil { + if metadataFile.Hash == hash { + level.Debug(s.logger).Log("msg", "debug info already exists", "buildid", buildID) + return status.Error(codes.AlreadyExists, "debuginfo already exists") + } + } + objFile, err := s.fetchObjectFile(ctx, buildID) if err != nil { return status.Error(codes.Internal, err.Error()) } - if req.GetInfo().Hash != "" { - h, err := hash.File(objFile) - if err != nil { - return status.Error(codes.Internal, err.Error()) - } - - if h == req.GetInfo().Hash { - level.Debug(s.logger).Log("msg", "debug info already exists", "buildid", buildID) - return status.Error(codes.AlreadyExists, "debuginfo already exists") + if err := elfutils.ValidateFile(objFile); err != nil { + // Failed to validate. Mark the file as corrupted, and let the client try to upload it again. + if err := s.metadataManager.markAsCorrupted(ctx, buildID); err != nil { + level.Warn(s.logger).Log("msg", "failed to update metadata as corrupted", "err", err) } + level.Error(s.logger).Log("msg", "failed to validate object file", "buildid", buildID) + // Client will retry. + return status.Error(codes.Internal, err.Error()) } + // Valid. hasDWARF, err := elfutils.HasDWARF(objFile) - if err != nil && !errors.Is(err, io.EOF) { - var fe *elf.FormatError - if errors.As(err, &fe) { - // Ignore bad magic number if all zero - if !strings.Contains(fe.Error(), "bad magic number '[0 0 0 0]'") { - return status.Error(codes.Internal, err.Error()) - } - } else { - return status.Error(codes.Internal, err.Error()) - } + if err != nil { + level.Debug(s.logger).Log("msg", "failed to check for DWARF", "err", err) } if hasDWARF { - // We probably have the best version. - level.Debug(s.logger).Log("msg", "debug info with DWARF already exists", "buildid", buildID) return status.Error(codes.AlreadyExists, "debuginfo already exists") } } - if err := s.metadataManager.update(ctx, buildID, metadataStateUploading); err != nil { - level.Error(s.logger).Log("msg", "failed to update metadata", "err", err) - } - - // At this point we know that we still have a better version of the debug information file, + // At this point we know that we received a better version of the debug information file, // so let the client upload it. - r := &UploadReader{stream: stream} - if err := s.bucket.Upload(ctx, objectPath(buildID), r); err != nil { + + if err := s.metadataManager.markAsUploading(ctx, buildID); err != nil { + err = fmt.Errorf("failed to update metadata before uploading: %w", err) + return status.Error(codes.Internal, err.Error()) + } + + // limitio.Writer is used to avoid buffer overflow. + // We only need to read the first 64 bytes (at most). + // The ELF header is 52 or 64 bytes long for 32-bit and 64-bit binaries respectively. + // If we receive a longer data, we will ignore the rest without an error. + b := bytes.NewBuffer(nil) + w := limitio.NewWriter(b, 64, true) + + // Here we're optimistically uploading the received stream directly to the bucket, + // and if something goes wrong we mark it as corrupted, so it could be overwritten in subsequent calls. + // We only want to make sure we don't read a corrupted file while symbolizing. + // Ww also wanted to prevent any form of buffering for this data on the server-side, + // thus the optimistic writes directly to the object-store while also writing the header of the file into a buffer, + // so we can validate the ELF header. + if err := s.bucket.Upload(ctx, objectPath(buildID), io.TeeReader(r, w)); err != nil { msg := "failed to upload" level.Error(s.logger).Log("msg", msg, "err", err) return status.Errorf(codes.Unknown, msg) } - if err := s.metadataManager.update(ctx, buildID, metadataStateUploaded); err != nil { - level.Error(s.logger).Log("msg", "failed to update metadata", "err", err) + if err := elfutils.ValidateHeader(b); err != nil { + // Failed to validate. Mark the incoming stream as corrupted, and let the client try to upload it again. + if err := s.metadataManager.markAsCorrupted(ctx, buildID); err != nil { + err = fmt.Errorf("failed to update metadata after uploaded, as corrupted: %w", err) + return status.Error(codes.Internal, err.Error()) + } + return status.Error(codes.InvalidArgument, err.Error()) + } + + if err := s.metadataManager.markAsUploaded(ctx, buildID, hash); err != nil { + err = fmt.Errorf("failed to update metadata after uploaded: %w", err) + return status.Error(codes.Internal, err.Error()) } - level.Debug(s.logger).Log("msg", "debug info uploaded", "buildid", buildID) - return stream.SendAndClose(&debuginfopb.UploadResponse{ - BuildId: buildID, - Size: r.size, - }) + + return nil +} + +func isStale(metadataFile *metadata) bool { + return time.Now().Add(-15 * time.Minute).After(time.Unix(metadataFile.UploadStartedAt, 0)) } -func validateID(id string) error { +func validateInput(id string) error { _, err := hex.DecodeString(id) if err != nil { - return fmt.Errorf("failed to validate id: %w", err) + return fmt.Errorf("failed to validate input: %w", err) } if len(id) <= 2 { - return errors.New("unexpectedly short ID") + return errors.New("unexpectedly short input") } return nil @@ -305,6 +357,7 @@ func (s *Store) Symbolize(ctx context.Context, m *pb.Mapping, locations []*pb.Lo buildID := m.BuildId logger := log.With(s.logger, "buildid", buildID) + var downloadedFromDebugInfod bool objFile, err := s.fetchObjectFile(ctx, buildID) if err != nil { // It's ok if we don't have the symbols for given BuildID, it happens too often. @@ -315,19 +368,43 @@ func (s *Store) Symbolize(ctx context.Context, m *pb.Mapping, locations []*pb.Lo if err != nil { return nil, fmt.Errorf("failed to fetch: %w", err) } + downloadedFromDebugInfod = true } // Let's make sure we have the best version of the debug file. - hasDWARF, err := elfutils.HasDWARF(objFile) - if err != nil { - return nil, fmt.Errorf("failed to check for DWARF: %w", err) + if err := elfutils.ValidateFile(objFile); err != nil { + level.Warn(logger).Log("msg", "failed to validate debug information", "err", err) + // Mark the file as corrupted, and let the client try to upload it again. + err := s.metadataManager.markAsCorrupted(ctx, buildID) + if err != nil { + level.Warn(logger).Log( + "msg", "failed to mar debug information", + "err", fmt.Errorf("failed to update metadata for corrupted: %w", err), + ) + } + if !downloadedFromDebugInfod { + dbgFile, err := s.fetchDebuginfodFile(ctx, buildID) + if err != nil { + level.Warn(logger).Log("msg", "failed to fetch debuginfod file", "err", err) + } else { + objFile = dbgFile + downloadedFromDebugInfod = true + } + } } - if !hasDWARF { - dbgFile, err := s.fetchDebuginfodFile(ctx, buildID) + if !downloadedFromDebugInfod { + hasDWARF, err := elfutils.HasDWARF(objFile) if err != nil { - level.Warn(logger).Log("msg", "failed to fetch debuginfod file", "err", err) - } else { - objFile = dbgFile + level.Debug(logger).Log("msg", "failed to check for DWARF", "err", err) + } + if !hasDWARF { + // Try to download a better version from debuginfod servers. + dbgFile, err := s.fetchDebuginfodFile(ctx, buildID) + if err != nil { + level.Warn(logger).Log("msg", "failed to fetch debuginfod file", "err", err) + } else { + objFile = dbgFile + } } } diff --git a/pkg/debuginfo/store_test.go b/pkg/debuginfo/store_test.go index 65eed5dfed2..44c9308b538 100644 --- a/pkg/debuginfo/store_test.go +++ b/pkg/debuginfo/store_test.go @@ -16,6 +16,7 @@ package debuginfo import ( "bytes" "context" + "encoding/hex" "io" "io/ioutil" stdlog "log" @@ -88,6 +89,7 @@ func TestStore(t *testing.T) { require.NoError(t, err) defer conn.Close() c := NewDebugInfoClient(conn) + b := bytes.NewBuffer(nil) for i := 0; i < 1024; i++ { b.Write([]byte("a")) @@ -98,28 +100,31 @@ func TestStore(t *testing.T) { for i := 0; i < 1024; i++ { b.Write([]byte("c")) } - size, err := c.Upload(context.Background(), "abcd", "", b) + _, err = c.Upload(context.Background(), "abcd", "abcd", b) + require.Error(t, err) + + nf, err := os.Open("testdata/validelf_nosections") require.NoError(t, err) - require.Equal(t, uint64(3072), size) - obj, err := s.bucket.Get(context.Background(), "abcd/debuginfo") + _, err = c.Upload(context.Background(), hex.EncodeToString([]byte("nosection")), "abcd", nf) + require.Error(t, err) + + wf, err := os.Open("testdata/validelf_withsections") require.NoError(t, err) - content, err := io.ReadAll(obj) + size, err := c.Upload(context.Background(), hex.EncodeToString([]byte("section")), "abcd", wf) require.NoError(t, err) - require.Equal(t, 3072, len(content)) + require.Equal(t, 7079, int(size)) - for i := 0; i < 1024; i++ { - require.Equal(t, []byte("a")[0], content[i]) - } - for i := 0; i < 1024; i++ { - require.Equal(t, []byte("b")[0], content[i+1024]) - } - for i := 0; i < 1024; i++ { - require.Equal(t, []byte("c")[0], content[i+2048]) - } + obj, err := s.bucket.Get(context.Background(), hex.EncodeToString([]byte("section"))+"/debuginfo") + require.NoError(t, err) + + content, err := io.ReadAll(obj) + require.NoError(t, err) + require.Equal(t, 7079, len(content)) + require.Equal(t, []byte{0x7f, 'E', 'L', 'F'}, content[:4]) - exists, err := c.Exists(context.Background(), "abcd", "") + exists, err := c.Exists(context.Background(), hex.EncodeToString([]byte("section")), "abcd") require.NoError(t, err) require.True(t, exists) } diff --git a/pkg/debuginfo/testdata/validelf_nosections b/pkg/debuginfo/testdata/validelf_nosections new file mode 100644 index 00000000000..e34a416ca3e Binary files /dev/null and b/pkg/debuginfo/testdata/validelf_nosections differ diff --git a/pkg/debuginfo/testdata/validelf_withsections b/pkg/debuginfo/testdata/validelf_withsections new file mode 100644 index 00000000000..1e70d01acc5 Binary files /dev/null and b/pkg/debuginfo/testdata/validelf_withsections differ diff --git a/pkg/hash/hash.go b/pkg/hash/hash.go index bda634dd32f..1a3db48c323 100644 --- a/pkg/hash/hash.go +++ b/pkg/hash/hash.go @@ -27,13 +27,22 @@ import ( func File(path string) (string, error) { f, err := os.Open(path) if err != nil { - return "", fmt.Errorf("failed to open file: %w", err) + return "", fmt.Errorf("failed to open the file: %w", err) } defer f.Close() h := xxhash.New() if _, err := io.Copy(h, f); err != nil { - return "", fmt.Errorf("failed to hash debug info file: %w", err) + return "", fmt.Errorf("failed to hash the file: %w", err) + } + return hex.EncodeToString(h.Sum(nil)), nil +} + +// Reader returns the hash of the reader. +func Reader(r io.Reader) (string, error) { + h := xxhash.New() + if _, err := io.Copy(h, r); err != nil { + return "", fmt.Errorf("failed to hash the reader: %w", err) } return hex.EncodeToString(h.Sum(nil)), nil } diff --git a/pkg/symbol/elfutils/elfutils.go b/pkg/symbol/elfutils/elfutils.go index 432a4094aa0..cb2a13690e4 100644 --- a/pkg/symbol/elfutils/elfutils.go +++ b/pkg/symbol/elfutils/elfutils.go @@ -14,10 +14,15 @@ package elfutils import ( + "bytes" "debug/elf" + "encoding/binary" "errors" "fmt" + "io" "strings" + + "github.com/nanmu42/limitio" ) var dwarfSuffix = func(s *elf.Section) string { @@ -139,8 +144,8 @@ func IsGoObjFile(path string) (bool, error) { } // HasSymbols reports whether the specified executable or library file contains symbols (both.symtab and .dynsym). -func HasSymbols(filePath string) (bool, error) { - ef, err := elf.Open(filePath) +func HasSymbols(path string) (bool, error) { + ef, err := elf.Open(path) if err != nil { return false, fmt.Errorf("failed to open elf: %w", err) } @@ -153,3 +158,114 @@ func HasSymbols(filePath string) (bool, error) { } return false, nil } + +// ValidateFile returns an error if the given object file is not valid. +func ValidateFile(path string) error { + elfFile, err := elf.Open(path) + if err != nil { + return err + } + defer elfFile.Close() + + // TODO(kakkoyun): How can we improve this without allocating too much memory. + if len(elfFile.Sections) == 0 { + return errors.New("ELF does not have any sections") + } + return nil +} + +// ValidateHeader returns an error if the given object file header is not valid. +func ValidateHeader(r io.Reader) error { + // Identity reader. + buf := bytes.NewBuffer(nil) + // limitio.Writer is used to avoid buffer overflow. + // We only need to read the first 2 bytes. + // If we receive a longer data, we will ignore the rest without an error. + w := limitio.NewWriter(buf, 16, true) + + // NOTICE: The ELF header is 52 or 64 bytes long for 32-bit and 64-bit binaries respectively + r = io.TeeReader(io.LimitReader(r, 64), w) + + // We need to read the entire header to determine the class of the file. + b, err := io.ReadAll(r) + if err != nil { + return err + } + r = bytes.NewReader(b) + + var ident [16]byte + _, err = buf.Read(ident[:]) + if err != nil { + return err + } + if ident[0] != '\x7f' || ident[1] != 'E' || ident[2] != 'L' || ident[3] != 'F' { + return fmt.Errorf("invalid magic number, %s", ident[0:4]) + } + + c := elf.Class(ident[elf.EI_CLASS]) + switch c { + case elf.ELFCLASS32: + case elf.ELFCLASS64: + // ok + default: + return fmt.Errorf("unknown ELF class, %s", c) + } + + var byteOrder binary.ByteOrder + d := elf.Data(ident[elf.EI_DATA]) + switch d { + case elf.ELFDATA2LSB: + byteOrder = binary.LittleEndian + case elf.ELFDATA2MSB: + byteOrder = binary.BigEndian + default: + return fmt.Errorf("unknown ELF data encoding, %s", d) + } + + fv := elf.Version(ident[elf.EI_VERSION]) + if fv != elf.EV_CURRENT { + return fmt.Errorf("unknown ELF version, %s", fv) + } + + // Read ELF file header. + var shoff int64 + var shnum, shstrndx int + switch c { + case elf.ELFCLASS32: + hdr := new(elf.Header32) + if err := binary.Read(io.LimitReader(r, 52), byteOrder, hdr); err != nil { + return err + } + if v := elf.Version(hdr.Version); v != fv { + return fmt.Errorf("invalid ELF version, %s", v) + } + shoff = int64(hdr.Shoff) + shnum = int(hdr.Shnum) + shstrndx = int(hdr.Shstrndx) + case elf.ELFCLASS64: + hdr := new(elf.Header64) + if err := binary.Read(r, byteOrder, hdr); err != nil { + return err + } + if v := elf.Version(hdr.Version); v != fv { + return fmt.Errorf("invalid ELF version, %s", v) + } + shoff = int64(hdr.Shoff) + shnum = int(hdr.Shnum) + shstrndx = int(hdr.Shstrndx) + } + + if shoff == 0 && shnum != 0 { + return fmt.Errorf("invalid ELF file, shoff is 0 but shnum is %d", shnum) + } + + if shnum > 0 && shstrndx >= shnum { + return fmt.Errorf("invalid ELF file, shstrndx is %d but shnum is %d", shstrndx, shnum) + } + + if shnum <= 0 { + return fmt.Errorf("elf file has no sections") + } + + return nil +} diff --git a/pkg/symbolizer/symbolizer.go b/pkg/symbolizer/symbolizer.go index aba696e819b..3089333cf3d 100644 --- a/pkg/symbolizer/symbolizer.go +++ b/pkg/symbolizer/symbolizer.go @@ -56,7 +56,7 @@ func (s *Symbolizer) Run(ctx context.Context, interval time.Duration) error { err = s.symbolize(ctx, lres.Locations) if err != nil { - level.Error(s.logger).Log("msg", "symbolization round finished with errors") + level.Warn(s.logger).Log("msg", "symbolization attempt finished with errors") level.Debug(s.logger).Log("msg", "errors occurred during symbolization", "err", err) } else { level.Info(s.logger).Log("msg", "symbolization round finished successfully") diff --git a/proto/parca/debuginfo/v1alpha1/debuginfo.proto b/proto/parca/debuginfo/v1alpha1/debuginfo.proto index 5a2bfaa0bfc..42e210320e9 100644 --- a/proto/parca/debuginfo/v1alpha1/debuginfo.proto +++ b/proto/parca/debuginfo/v1alpha1/debuginfo.proto @@ -43,8 +43,11 @@ message UploadInfo { // build_id is a unique identifier for the debug data string build_id = 1; - // hash is the hash of the debug information file + // hash is the hash of the source file that debug information extracted from string hash = 2; + +// TODO(kakkoyun): Add SourceHash and use Hash as debuginfo file hash. +// TODO(kakkoyun): Add SourceType enum. } // UploadResponse returns the build_id and the size of the uploaded debug info diff --git a/ui/packages/shared/client/src/parca/debuginfo/v1alpha1/debuginfo.ts b/ui/packages/shared/client/src/parca/debuginfo/v1alpha1/debuginfo.ts index ce6895310c6..62b0b32baba 100644 --- a/ui/packages/shared/client/src/parca/debuginfo/v1alpha1/debuginfo.ts +++ b/ui/packages/shared/client/src/parca/debuginfo/v1alpha1/debuginfo.ts @@ -86,7 +86,7 @@ export interface UploadInfo { */ buildId: string; /** - * hash is the hash of the debug information file + * hash is the hash of the source file that debug information extracted from * * @generated from protobuf field: string hash = 2; */