Skip to content

Commit

Permalink
Copy persisted artifacts when replaying an invocation (#8118)
Browse files Browse the repository at this point in the history
When replaying an invocation, copy the artifacts from blobstore to the
destination cache target. The destination BES handler should then copy
those blobs to its local blobstore, which should allow the artifacts to
be visible in the UI. This is necessary since we recently switched the
/file/download endpoint to avoid reading from cache. I tested this
change before/after and verified it fixes the issue where test logs are
not visible when replaying an invocation.
  • Loading branch information
bduffany authored Jan 6, 2025
1 parent c68c4d8 commit a67a2c5
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 0 deletions.
5 changes: 5 additions & 0 deletions tools/replay_invocation/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ go_library(
"//proto:publish_build_event_go_proto",
"//server/backends/blobstore",
"//server/backends/chunkstore",
"//server/build_event_protocol/accumulator",
"//server/build_event_protocol/build_event_handler",
"//server/eventlog",
"//server/interfaces",
"//server/real_environment",
"//server/remote_cache/cachetools",
"//server/remote_cache/digest",
"//server/util/grpc_client",
"//server/util/healthcheck",
"//server/util/log",
"//server/util/proto",
"//server/util/protofile",
"@com_github_google_uuid//:uuid",
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
"@org_golang_google_grpc//metadata",
"@org_golang_google_protobuf//types/known/anypb",
],
Expand Down
62 changes: 62 additions & 0 deletions tools/replay_invocation/replay_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,23 @@ package main
import (
"context"
"flag"
"fmt"
"io"
"net/url"
"os"
"path"
"regexp"
"strings"

"github.com/buildbuddy-io/buildbuddy/server/backends/blobstore"
"github.com/buildbuddy-io/buildbuddy/server/backends/chunkstore"
"github.com/buildbuddy-io/buildbuddy/server/build_event_protocol/accumulator"
"github.com/buildbuddy-io/buildbuddy/server/build_event_protocol/build_event_handler"
"github.com/buildbuddy-io/buildbuddy/server/eventlog"
"github.com/buildbuddy-io/buildbuddy/server/interfaces"
"github.com/buildbuddy-io/buildbuddy/server/real_environment"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/cachetools"
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
"github.com/buildbuddy-io/buildbuddy/server/util/grpc_client"
"github.com/buildbuddy-io/buildbuddy/server/util/healthcheck"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
Expand All @@ -26,17 +33,21 @@ import (
bepb "github.com/buildbuddy-io/buildbuddy/proto/build_events"
inpb "github.com/buildbuddy-io/buildbuddy/proto/invocation"
pepb "github.com/buildbuddy-io/buildbuddy/proto/publish_build_event"
bspb "google.golang.org/genproto/googleapis/bytestream"
)

var (
invocationID = flag.String("invocation_id", "", "The invocation ID to replay.")
besBackend = flag.String("bes_backend", "", "The bes backend to replay events to.")
besResultsURL = flag.String("bes_results_url", "", "The invocation URL prefix")
cacheTarget = flag.String("cache_target", "", "Cache target where artifacts are copied, if applicable. Defaults to bes_backend.")
apiKey = flag.String("api_key", "", "The API key of the account that will own the replayed events")
printLogs = flag.Bool("print_logs", false, "Copy logs from Progress events to stdout/stderr.")
// TODO: Figure out the latest attempt number automatically.
attemptNumber = flag.Int("attempt", 1, "Invocation attempt number.")

copyArtifacts = flag.Bool("copy_artifacts", false, "Copy blobstore-persisted invocation artifacts to the cache target. This is required to view test logs, timing profile, and other files in the build event stream.")

metadataOverride arrayFlags

// Note: you will also need to configure a blobstore.
Expand Down Expand Up @@ -97,6 +108,18 @@ func main() {
defer conn.Close()
client := pepb.NewPublishBuildEventClient(conn)

var cacheConn *grpc_client.ClientConnPool
if *cacheTarget == "" {
// Default to bes_backend connection.
cacheConn = conn
} else {
c, err := grpc_client.DialSimple(*cacheTarget)
if err != nil {
log.Fatalf("Error dialing cache target: %s", err)
}
cacheConn = c
}
bytestreamClient := bspb.NewByteStreamClient(cacheConn)
if *apiKey != "" {
ctx = metadata.AppendToOutgoingContext(ctx, "x-buildbuddy-api-key", *apiKey)
}
Expand Down Expand Up @@ -160,6 +183,23 @@ func main() {
p.BuildMetadata.Metadata[parts[0]] = parts[1]
}
}

if *copyArtifacts {
// Use the logic from accumulator just to parse output files from
// events.
fileAccumulator := accumulator.NewBEValues(&inpb.Invocation{})
fileAccumulator.AddEvent(buildEvent)
// Copy artifacts from the source blobstore to the target cache before
// publishing the event containing the bytestream URL references.
for _, f := range fileAccumulator.OutputFiles() {
if err := copyArtifact(ctx, bytestreamClient, bs, f.GetUri()); err != nil {
log.Warningf("Failed to copy file %q: %s", f.GetUri(), err)
continue
}
log.Infof("Copied persisted artifact %q", f.GetUri())
}
}

a := &anypb.Any{}
if err := a.MarshalFrom(buildEvent); err != nil {
log.Fatalf("Error marshaling bazel event to any: %s", err.Error())
Expand Down Expand Up @@ -242,3 +282,25 @@ func main() {

log.Infof("Done! Results should be visible at %s", invocationURL)
}

// Copies a persisted artifact from the given blobstore to the destination cache
// target.
func copyArtifact(ctx context.Context, dst bspb.ByteStreamClient, src interfaces.Blobstore, uri string) error {
rn, err := digest.ParseDownloadResourceName(uri)
if err != nil {
return fmt.Errorf("parse bytestream URI as resource name: %w", err)
}
parsedURL, err := url.Parse(uri)
if err != nil {
return fmt.Errorf("parse bytestream URI as URL: %w", err)
}
blobName := path.Join(*invocationID, "artifacts", "cache", parsedURL.Path)
b, err := src.ReadBlob(ctx, blobName)
if err != nil {
return fmt.Errorf("read blob %q: %w", blobName, err)
}
if _, err := cachetools.UploadBlobToCAS(ctx, dst, rn.GetInstanceName(), rn.GetDigestFunction(), b); err != nil {
return fmt.Errorf("upload blob to CAS: %w", err)
}
return nil
}

0 comments on commit a67a2c5

Please sign in to comment.