Skip to content

Commit

Permalink
resolved comments
Browse files Browse the repository at this point in the history
  • Loading branch information
riteshghorse committed Apr 5, 2022
1 parent 3a4557f commit 0010a67
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 16 deletions.
6 changes: 3 additions & 3 deletions sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options
if err != nil {
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
} else {
statusHandler.wg.Add(1)
statusHandler.start(ctx)
defer statusHandler.stop(ctx)
if err := statusHandler.start(ctx); err == nil {
defer statusHandler.stop(ctx)
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions sdks/go/pkg/beam/core/runtime/harness/worker_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ func (w *workerStatusHandler) shutdown() {
}

// start starts the reader to accept WorkerStatusRequest and send WorkerStatusResponse with WorkerStatus API.
func (w *workerStatusHandler) start(ctx context.Context) {
func (w *workerStatusHandler) start(ctx context.Context) error {
statusClient := fnpb.NewBeamFnWorkerStatusClient(w.conn)
stub, err := statusClient.WorkerStatus(ctx)
if err != nil {
log.Errorf(ctx, "status client not established: %v", err)
return
return errors.WithContext(err, "status endpoint client not established")
}
w.wg.Add(1)
go w.reader(ctx, stub)
return nil
}

// reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to
Expand All @@ -69,7 +71,7 @@ func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorker
buf := make([]byte, 1<<16)
for w.isAlive() {
req, err := stub.Recv()
if err != nil {
if err != nil && err != io.EOF {
log.Debugf(ctx, "exiting workerStatusHandler.Reader(): %v", err)
return
}
Expand All @@ -83,10 +85,12 @@ func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorker
}

// stop stops the reader and closes worker status endpoint connection with the runner.
func (w *workerStatusHandler) stop(ctx context.Context) {
func (w *workerStatusHandler) stop(ctx context.Context) error {
w.shutdown()
w.wg.Wait()
if err := w.conn.Close(); err != nil {
log.Errorf(ctx, "error closing status endpoint connection: %v", err)
return errors.WithContext(err, "error closing status endpoint connection")
}
return nil
}
23 changes: 14 additions & 9 deletions sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ func (w *BeamFnWorkerStatusServicer) WorkerStatus(b fnpb.BeamFnWorkerStatus_Work
return nil
}

const buffsize = 1024 * 1024

var lis *bufconn.Listener

func setup(t *testing.T, srv *BeamFnWorkerStatusServicer) {
const buffsize = 1024 * 1024
server := grpc.NewServer()
lis = bufconn.Listen(buffsize)
fnpb.RegisterBeamFnWorkerStatusServer(server, srv)
go func() {
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
panic(err)
}
}()
t.Cleanup(func() {
Expand All @@ -68,19 +68,24 @@ func TestSendStatusResponse(t *testing.T) {
ctx := context.Background()
srv := &BeamFnWorkerStatusServicer{response: make(chan string)}
setup(t, srv)
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(dialer), grpc.WithInsecure())

conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithInsecure(), grpc.WithContextDialer(dialer))
if err != nil {
t.Fatalf("unable to start test server: %v", err)
}

statusHandler := workerStatusHandler{conn: conn}
statusHandler.wg.Add(1)
statusHandler.start(ctx)
t.Cleanup(func() {
statusHandler.stop(ctx)
})
if err := statusHandler.start(ctx); err != nil {
t.Fatal(err)
}

response := []string{}
response = append(response, <-srv.response)
if len(response) == 0 {
t.Errorf("error in response: %v", response)
t.Errorf("no response received: %v", response)
}

if err := statusHandler.stop(ctx); err != nil {
t.Error(err)
}
}

0 comments on commit 0010a67

Please sign in to comment.