Skip to content

Commit

Permalink
[Go SDK] Add more info to Worker Status API (apache#21776)
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse authored and bullet03 committed Jun 20, 2022
1 parent 3f9b473 commit 845a5cf
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 40 deletions.
2 changes: 1 addition & 1 deletion sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func main() {
"--options=" + options,
}
if info.GetStatusEndpoint() != nil {
args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
os.Setenv("STATUS_ENDPOINT", info.GetStatusEndpoint().GetUrl())
}

if len(info.GetRunnerCapabilities()) > 0 {
Expand Down
50 changes: 50 additions & 0 deletions sdks/go/pkg/beam/core/metrics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package metrics

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
)

// Implementation note: We avoid depending on the FnAPI protos here
Expand Down Expand Up @@ -156,6 +159,22 @@ type ptCounterSet struct {

type bundleProcState int

// String implements the Stringer interface.
func (b bundleProcState) String() string {
switch b {
case StartBundle:
return "START_BUNDLE"
case ProcessBundle:
return "PROCESS_BUNDLE"
case FinishBundle:
return "FINISH_BUNDLE"
case TotalBundle:
return "TOTAL_BUNDLE"
default:
return "unknown process bundle state!"
}
}

const (
// StartBundle indicates starting state of a bundle
StartBundle bundleProcState = 0
Expand All @@ -174,12 +193,22 @@ type ExecutionState struct {
TotalTime time.Duration
}

// String implements the Stringer interface.
func (e ExecutionState) String() string {
return fmt.Sprintf("Execution State:\n\t State: %s\n\t IsProcessing: %v\n\t Total time: %v\n", e.State, e.IsProcessing, e.TotalTime)
}

// BundleState stores information about a PTransform for execution time metrics.
type BundleState struct {
pid string
currentState bundleProcState
}

// String implements the Stringer interface.
func (b BundleState) String() string {
return fmt.Sprintf("Bundle State:\n\t PTransform ID: %s\n\t Current state: %s", b.pid, b.currentState)
}

// currentStateVal exports the current state of a bundle wrt PTransform.
type currentStateVal struct {
pid string
Expand Down Expand Up @@ -218,3 +247,24 @@ func (b *Store) storeMetric(pid string, n name, m userMetric) {
}
b.store[l] = m
}

// BundleState returns the bundle state.
func (b *Store) BundleState() string {
bs := *(*BundleState)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&b.bundleState))))
return bs.String()
}

// StateRegistry returns the state registry that stores bundleID to executions states mapping.
func (b *Store) StateRegistry() string {
b.mu.Lock()
defer b.mu.Unlock()
builder := &strings.Builder{}
builder.WriteString("\n | All Bundle Process States | \n")
for bundleID, state := range b.stateRegistry {
builder.WriteString(fmt.Sprintf("\tBundle ID: %s\n", bundleID))
for i := 0; i < 4; i++ {
builder.WriteString(fmt.Sprintf("\t%s\n", state[i]))
}
}
return builder.String()
}
52 changes: 26 additions & 26 deletions sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)

// StatusAddress is a type of status endpoint address as an optional argument to harness.Main().
type StatusAddress string

// URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs.
const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"

Expand All @@ -50,22 +47,14 @@ const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"
// Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
// ultimately responsible for correctly executing user code.
func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options ...interface{}) error {
func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
hooks.DeserializeHooksFromOptions(ctx)

statusEndpoint := ""
for _, option := range options {
switch option := option.(type) {
case StatusAddress:
statusEndpoint = string(option)
default:
return errors.Errorf("unknown type %T, value %v in error call", option, option)
}
}

// Extract environment variables. These are optional runner supported capabilities.
// Expected env variables:
// RUNNER_CAPABILITIES : list of runner supported capability urn.
// STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
statusEndpoint := os.Getenv("STATUS_ENDPOINT")
runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ")
rcMap := make(map[string]bool)
if len(runnerCapabilities) > 0 {
Expand Down Expand Up @@ -128,18 +117,6 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options
log.Debugf(ctx, "control response channel closed")
}()

// if the runner supports worker status api then expose SDK harness status
if statusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint)
if err != nil {
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
} else {
if err := statusHandler.start(ctx); err == nil {
defer statusHandler.stop(ctx)
}
}
}

sideCache := statecache.SideInputCache{}
sideCache.Init(cacheSize)

Expand All @@ -157,6 +134,19 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options
cache: &sideCache,
runnerCapabilities: rcMap,
}

// if the runner supports worker status api then expose SDK harness status
if statusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) })
if err != nil {
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
} else {
if err := statusHandler.start(ctx); err == nil {
defer statusHandler.stop(ctx)
}
}
}

// gRPC requires all readers of a stream be the same goroutine, so this goroutine
// is responsible for managing the network data. All it does is pull data from
// the stream, and hand off the message to a goroutine to actually be handled,
Expand Down Expand Up @@ -296,6 +286,16 @@ type control struct {
runnerCapabilities map[string]bool
}

func (c *control) metStoreToString(statusInfo *strings.Builder) {
c.mu.Lock()
defer c.mu.Unlock()
for bundleID, store := range c.metStore {
statusInfo.WriteString(fmt.Sprintf("Bundle ID: %v\n", bundleID))
statusInfo.WriteString(fmt.Sprintf("\t%s", store.BundleState()))
statusInfo.WriteString(fmt.Sprintf("\t%s", store.StateRegistry()))
}
}

func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) {
c.mu.Lock()
plans, ok := c.plans[bdID]
Expand Down
3 changes: 1 addition & 2 deletions sdks/go/pkg/beam/core/runtime/harness/init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ var (
id = flag.String("id", "", "Local identifier (required in worker mode).")
loggingEndpoint = flag.String("logging_endpoint", "", "Local logging gRPC endpoint (required in worker mode).")
controlEndpoint = flag.String("control_endpoint", "", "Local control gRPC endpoint (required in worker mode).")
statusEndpoint = flag.String("status_endpoint", "", "Local status gRPC endpoint (optional in worker mode).")
//lint:ignore U1000 semiPersistDir flag is passed in through the boot container, will need to be removed later
semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional in worker mode).")
options = flag.String("options", "", "JSON-encoded pipeline options (required in worker mode).")
Expand Down Expand Up @@ -108,7 +107,7 @@ func hook() {
// does, and establish the background context here.

ctx := grpcx.WriteWorkerID(context.Background(), *id)
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint, harness.StatusAddress(*statusEndpoint)); err != nil {
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil {
fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err)
switch ShutdownMode {
case Terminate:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,10 @@ func (c *SideInputCache) evictElement(ctx context.Context) {
}
}
}

// CacheMetrics returns the cache metrics for current side input cache.
func (c *SideInputCache) CacheMetrics() CacheMetrics {
c.mu.Lock()
defer c.mu.Unlock()
return c.metrics
}
66 changes: 58 additions & 8 deletions sdks/go/pkg/beam/core/runtime/harness/worker_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ package harness

import (
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
"io"
"runtime"
"runtime/debug"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -31,17 +36,19 @@ import (

// workerStatusHandler stores the communication information of WorkerStatus API.
type workerStatusHandler struct {
conn *grpc.ClientConn
shouldShutdown int32
wg sync.WaitGroup
conn *grpc.ClientConn
shouldShutdown int32
wg sync.WaitGroup
cache *statecache.SideInputCache
metStoreToString func(*strings.Builder)
}

func newWorkerStatusHandler(ctx context.Context, endpoint string) (*workerStatusHandler, error) {
func newWorkerStatusHandler(ctx context.Context, endpoint string, cache *statecache.SideInputCache, metStoreToString func(*strings.Builder)) (*workerStatusHandler, error) {
sconn, err := dial(ctx, endpoint, 60*time.Second)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect: %v\n", endpoint)
}
return &workerStatusHandler{conn: sconn, shouldShutdown: 0}, nil
return &workerStatusHandler{conn: sconn, shouldShutdown: 0, cache: cache, metStoreToString: metStoreToString}, nil
}

func (w *workerStatusHandler) isAlive() bool {
Expand All @@ -65,20 +72,63 @@ func (w *workerStatusHandler) start(ctx context.Context) error {
return nil
}

func memoryUsage(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Memory Usage============\n")
m := runtime.MemStats{}
runtime.ReadMemStats(&m)
statusInfo.WriteString(fmt.Sprintf("heap in-use-spans/allocated/total/max = %d/%d/%d/%d MB\n", m.HeapInuse>>20, m.HeapAlloc>>20, m.TotalAlloc>>20, m.HeapSys>>20))
statusInfo.WriteString(fmt.Sprintf("stack in-use-spans/max = %d/%d MB\n", m.StackInuse>>20, m.StackSys>>20))
statusInfo.WriteString(fmt.Sprintf("GC-CPU percentage = %.2f %%\n", m.GCCPUFraction*100))
statusInfo.WriteString(fmt.Sprintf("Last GC time: %v\n", time.Unix(0, int64(m.LastGC))))
statusInfo.WriteString(fmt.Sprintf("Next GC: %v MB\n", m.NextGC>>20))
}

func (w *workerStatusHandler) activeProcessBundleStates(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Active Process Bundle States============\n")
w.metStoreToString(statusInfo)
}

func (w *workerStatusHandler) cacheStats(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Cache Stats============\n")
statusInfo.WriteString(fmt.Sprintf("State Cache:\n%+v\n", w.cache.CacheMetrics()))
}

func goroutineDump(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Goroutine Dump============\n")
profile := pprof.Lookup("goroutine")
if profile != nil {
profile.WriteTo(statusInfo, 1)
}
}

func buildInfo(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Build Info============\n")
if info, ok := debug.ReadBuildInfo(); ok {
statusInfo.WriteString(info.String())
}
}

// reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to
// a response channel.
func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
defer w.wg.Done()
buf := make([]byte, 1<<16)

for w.isAlive() {
req, err := stub.Recv()
if err != nil && err != io.EOF {
log.Debugf(ctx, "exiting workerStatusHandler.Reader(): %v", err)
return
}
log.Debugf(ctx, "RECV-status: %v", req.GetId())
runtime.Stack(buf, true)
response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: string(buf)}

statusInfo := &strings.Builder{}
memoryUsage(statusInfo)
w.activeProcessBundleStates(statusInfo)
w.cacheStats(statusInfo)
goroutineDump(statusInfo)
buildInfo(statusInfo)

response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: statusInfo.String()}
if err := stub.Send(response); err != nil && err != io.EOF {
log.Errorf(ctx, "workerStatus.Writer: Failed to respond: %v", err)
}
Expand Down
6 changes: 5 additions & 1 deletion sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package harness
import (
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
"log"
"net"
"strings"
"testing"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
Expand Down Expand Up @@ -75,7 +77,9 @@ func TestSendStatusResponse(t *testing.T) {
t.Fatalf("unable to start test server: %v", err)
}

statusHandler := workerStatusHandler{conn: conn}
statusHandler := workerStatusHandler{conn: conn, cache: &statecache.SideInputCache{}, metStoreToString: func(builder *strings.Builder) {
builder.WriteString("metStore metadata")
}}
if err := statusHandler.start(ctx); err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions sdks/go/test/integration/wordcount/wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package wordcount

import (
"context"
"fmt"
"regexp"
"strings"

"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
Expand Down

0 comments on commit 845a5cf

Please sign in to comment.