Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-2880: Add timestamp information to the replay camera #2371

Merged
merged 38 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
cedd896
Add timestamps to camera client and server
dmhilly May 9, 2023
248837f
Remove print stmts, start tests
dmhilly May 9, 2023
84e25de
Merge branch 'main' into RSDK-2880
dmhilly May 9, 2023
a97a073
Add PointCloudSourceWithTimestamps
dmhilly May 9, 2023
ec03d78
Dont need to use NextPointCloudTimestamps
dmhilly May 10, 2023
28f4d53
Compile error
dmhilly May 10, 2023
8529267
Merge branch 'main' into RSDK-2880
dmhilly May 10, 2023
0c35618
Added an interceptor
dmhilly May 10, 2023
c1b4367
Revert "Added an interceptor"
dmhilly May 10, 2023
0f6910f
Add back NextPointCloudWithTimestamps
dmhilly May 10, 2023
0727bfb
This works
dmhilly May 11, 2023
61f8c96
Introduce and use ContextWithMetadata
dmhilly May 15, 2023
318bcba
It works
dmhilly May 15, 2023
0673ab0
Try with interceptor
dmhilly May 15, 2023
985b11c
Revert "Try with interceptor"
dmhilly May 15, 2023
0a194d2
Clean up
dmhilly May 15, 2023
8f5cc96
Clean up
dmhilly May 16, 2023
1e7c876
Merge branch 'main' into RSDK-2880
dmhilly May 16, 2023
3f6ac99
Fix
dmhilly May 16, 2023
1e1852c
Interceptor works
dmhilly May 16, 2023
7c0ed22
Make more generic
dmhilly May 16, 2023
4073b64
Improvements
dmhilly May 16, 2023
a78a21a
Use higher precision when stringifying time
dmhilly May 16, 2023
2af5ec8
Move time req around
dmhilly May 17, 2023
5f9382d
PR feedback
dmhilly May 17, 2023
c3b6b7d
Move things around
dmhilly May 17, 2023
18420da
Typo
dmhilly May 18, 2023
61c8d69
Test timestamps in header
dmhilly May 18, 2023
8d57f0c
Factor out common code
dmhilly May 18, 2023
ac80ba4
Lint issues
dmhilly May 18, 2023
b09c792
Added test for the context interceptor
dmhilly May 18, 2023
1a76a6d
Improve comment and close connection:
dmhilly May 18, 2023
f8b58fd
PR feedback
dmhilly May 22, 2023
bdb39a5
Added unit test
dmhilly May 22, 2023
68a0b63
More test stuff
dmhilly May 22, 2023
d51bd9e
Linter
dmhilly May 22, 2023
58f2259
Change const name
dmhilly May 22, 2023
5f65cfb
Merge branch 'main' into RSDK-2880
dmhilly May 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions components/camera/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/edaniels/gostream"
"go.viam.com/test"
"go.viam.com/utils/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.viam.com/rdk/components/camera"
viamgrpc "go.viam.com/rdk/grpc"
Expand All @@ -24,6 +26,7 @@ import (
"go.viam.com/rdk/testutils"
"go.viam.com/rdk/testutils/inject"
rutils "go.viam.com/rdk/utils"
"go.viam.com/rdk/utils/contextutils"
)

func TestClient(t *testing.T) {
Expand Down Expand Up @@ -390,3 +393,65 @@ func TestClientLazyImage(t *testing.T) {

test.That(t, conn.Close(), test.ShouldBeNil)
}

func TestClientWithInterceptor(t *testing.T) {
// Set up gRPC server
logger := golog.NewTestLogger(t)
listener1, err := net.Listen("tcp", "localhost:0")
test.That(t, err, test.ShouldBeNil)
rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated())
test.That(t, err, test.ShouldBeNil)

// Set up camera that adds timestamps into the gRPC response header.
injectCamera := &inject.Camera{}

pcA := pointcloud.New()
err = pcA.Set(pointcloud.NewVector(5, 5, 5), nil)
test.That(t, err, test.ShouldBeNil)

k, v := "hello", "world"
injectCamera.NextPointCloudFunc = func(ctx context.Context) (pointcloud.PointCloud, error) {
var grpcMetadata metadata.MD = make(map[string][]string)
grpcMetadata.Set(k, v)
grpc.SendHeader(ctx, grpcMetadata)
return pcA, nil
}

// Register CameraService API in our gRPC server.
resources := map[resource.Name]camera.Camera{
camera.Named(testCameraName): injectCamera,
}
cameraSvc, err := resource.NewAPIResourceCollection(camera.API, resources)
test.That(t, err, test.ShouldBeNil)
resourceAPI, ok, err := resource.LookupAPIRegistration[camera.Camera](camera.API)
test.That(t, err, test.ShouldBeNil)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, cameraSvc), test.ShouldBeNil)

// Start serving requests.
go rpcServer.Serve(listener1)
defer rpcServer.Stop()

// Set up gRPC client with context with metadata interceptor.
conn, err := viamgrpc.Dial(
context.Background(),
listener1.Addr().String(),
logger,
rpc.WithUnaryClientInterceptor(contextutils.ContextWithMetadataUnaryClientInterceptor),
)
test.That(t, err, test.ShouldBeNil)
camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger)
test.That(t, err, test.ShouldBeNil)

// Construct a ContextWithMetadata to pass into NextPointCloud and check that the
// interceptor correctly injected the metadata from the gRPC response header into the
// context.
ctx, md := contextutils.ContextWithMetadata(context.Background())
pcB, err := camera1Client.NextPointCloud(ctx)
test.That(t, err, test.ShouldBeNil)
_, got := pcB.At(5, 5, 5)
test.That(t, got, test.ShouldBeTrue)
test.That(t, md[k][0], test.ShouldEqual, v)

test.That(t, conn.Close(), test.ShouldBeNil)
}
25 changes: 24 additions & 1 deletion components/camera/replaypcd/replaypcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import (
datapb "go.viam.com/api/app/data/v1"
goutils "go.viam.com/utils"
"go.viam.com/utils/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/timestamppb"

"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/internal/cloud"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/rimage/transform"
"go.viam.com/rdk/utils/contextutils"
)

// Model is the model of a replay camera.
Expand Down Expand Up @@ -146,8 +149,28 @@ func (replay *pcdCamera) NextPointCloud(ctx context.Context) (pointcloud.PointCl
return nil, errEndOfDataset
}

// If the caller is communicating with the replay camera over gRPC, set the timestamps on
// the gRPC header.
md := resp.GetData()[0].GetMetadata()
if stream := grpc.ServerTransportStreamFromContext(ctx); stream != nil {
var grpcMetadata metadata.MD = make(map[string][]string)

timeReq := md.GetTimeRequested()
if timeReq != nil {
grpcMetadata.Set(contextutils.TimeRequestedMetadataKey, timeReq.AsTime().Format(time.RFC3339Nano))
}
timeRec := md.GetTimeReceived()
if timeRec != nil {
grpcMetadata.Set(contextutils.TimeReceivedMetadataKey, timeRec.AsTime().Format(time.RFC3339Nano))
}

if err := grpc.SetHeader(ctx, grpcMetadata); err != nil {
return nil, err
}
}

replay.lastData = resp.GetLast()
data := resp.Data[0].GetBinary()
data := resp.GetData()[0].GetBinary()

r, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
Expand Down
43 changes: 43 additions & 0 deletions components/camera/replaypcd/replaypcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (
"go.viam.com/test"
"go.viam.com/utils"
"go.viam.com/utils/artifact"
"google.golang.org/grpc"

"go.viam.com/rdk/internal/cloud"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/testutils"
"go.viam.com/rdk/utils/contextutils"
)

const datasetDirectory = "slam/mock_lidar/%d.pcd"
Expand Down Expand Up @@ -424,3 +427,43 @@ func TestUnimplementedFunctions(t *testing.T) {

test.That(t, serverClose(), test.ShouldBeNil)
}

// TestNextPointCloudTimestamps tests that calls to NextPointCloud on the replay camera will inject
// the time received and time requested metadata into the gRPC response header.
func TestNextPointCloudTimestamps(t *testing.T) {
// Construct replay camera.
ctx := context.Background()
cfg := &Config{Source: "source"}
replayCamera, serverClose, err := createNewReplayPCDCamera(ctx, t, cfg, true)
test.That(t, err, test.ShouldBeNil)
test.That(t, replayCamera, test.ShouldNotBeNil)

// Repeatedly call NextPointCloud, checking for timestamps in the gRPC header.
for i := 0; i < numPCDFiles; i++ {
serverStream := testutils.NewServerTransportStream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] Does the metadata context need to be created every time? If not we could move lines 443-444 to 435 and call:

ctx = grpc.NewContextWithServerTransportStream(context.Background(), serverStream)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be, but I felt this more closely mimicked how this works in prod with consecutive gRPC calls using separate ServerTransportStreams and gRPC contexts. So I'm going to leave this as-is unless someone else feels strongly that this should change.

ctx = grpc.NewContextWithServerTransportStream(ctx, serverStream)
pc, err := replayCamera.NextPointCloud(ctx)
test.That(t, err, test.ShouldBeNil)
test.That(t, pc, test.ShouldResemble, getPointCloudFromArtifact(t, i))

expectedTimeReq := fmt.Sprintf(testTime, i)
expectedTimeRec := fmt.Sprintf(testTime, i+1)

actualTimeReq := serverStream.Value(contextutils.TimeRequestedMetadataKey)[0]
actualTimeRec := serverStream.Value(contextutils.TimeReceivedMetadataKey)[0]

test.That(t, expectedTimeReq, test.ShouldEqual, actualTimeReq)
test.That(t, expectedTimeRec, test.ShouldEqual, actualTimeRec)
}

// Confirm the end of the dataset was reached when expected
pc, err := replayCamera.NextPointCloud(ctx)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errEndOfDataset.Error())
test.That(t, pc, test.ShouldBeNil)

err = replayCamera.Close(ctx)
test.That(t, err, test.ShouldBeNil)

test.That(t, serverClose(), test.ShouldBeNil)
}
16 changes: 14 additions & 2 deletions components/camera/replaypcd/replaypcd_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"path/filepath"
"strconv"
"testing"
"time"

"github.com/edaniels/golog"
"github.com/pkg/errors"
datapb "go.viam.com/api/app/data/v1"
"go.viam.com/test"
"go.viam.com/utils/artifact"
"go.viam.com/utils/rpc"
"google.golang.org/protobuf/types/known/timestamppb"

"go.viam.com/rdk/components/camera"
viamgrpc "go.viam.com/rdk/grpc"
Expand All @@ -28,6 +30,8 @@ import (
"go.viam.com/rdk/testutils/inject"
)

const testTime = "2000-01-01T12:00:%02dZ"

// mockDataServiceServer is a struct that includes unimplemented versions of all the Data Service endpoints. These
// can be overwritten to allow developers to trigger desired behaviors during testing.
type mockDataServiceServer struct {
Expand Down Expand Up @@ -60,9 +64,17 @@ func (mDServer *mockDataServiceServer) BinaryDataByFilter(ctx context.Context, r
gz.Close()

// Construct response
timeReq, err := time.Parse(time.RFC3339, fmt.Sprintf(testTime, newFileNum))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[q] When should/do we need to use time.PFC3339 and time.RFC339Nano?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, in production code paths, we're using RFC3339 for parsing the requested time ranges from the config for the replay sensor. This means that we're limiting "users" (Cloud Story) from configuring anything here with higher granularity than 1 second. This matches what we do in the Data tab and I think this is ok!

We then use RFC3339Nano whenever we're stringifying or parsing one of the timestamps correlated with the images. These have nanosecond granularity so we need to use a time format that supports that so we don't lose that information.

Here in the tests I'm using RFC3339 because I'm only changing the second of the timestamp depending on the file, so I hard-coded something in the RFC3339 format.

if err != nil {
return nil, errors.Wrap(err, "failed parsing time")
}
timeRec := timeReq.Add(time.Second)
binaryData := &datapb.BinaryData{
Binary: dataBuf.Bytes(),
Metadata: &datapb.BinaryMetadata{},
Binary: dataBuf.Bytes(),
Metadata: &datapb.BinaryMetadata{
TimeRequested: timestamppb.New(timeReq),
TimeReceived: timestamppb.New(timeRec),
},
}

resp := &datapb.BinaryDataByFilterResponse{
Expand Down
2 changes: 2 additions & 0 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.viam.com/rdk/robot/packages"
"go.viam.com/rdk/session"
"go.viam.com/rdk/spatialmath"
"go.viam.com/rdk/utils/contextutils"
)

var (
Expand Down Expand Up @@ -278,6 +279,7 @@ func New(ctx context.Context, address string, logger golog.Logger, opts ...Robot
// interceptors are applied in order from first to last
rc.dialOptions = append(
rc.dialOptions,
rpc.WithUnaryClientInterceptor(contextutils.ContextWithMetadataUnaryClientInterceptor),
// error handling
rpc.WithUnaryClientInterceptor(rc.handleUnaryDisconnect),
rpc.WithStreamClientInterceptor(rc.handleStreamDisconnect),
Expand Down
21 changes: 4 additions & 17 deletions session/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/google/uuid"
"go.viam.com/test"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.viam.com/rdk/resource"
"go.viam.com/rdk/session"
"go.viam.com/rdk/testutils"
)

func TestToFromContext(t *testing.T) {
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestSafetyMonitor(t *testing.T) {
}

func TestSafetyMonitorForMetadata(t *testing.T) {
stream1 := &myStream{}
stream1 := testutils.NewServerTransportStream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

streamCtx := grpc.NewContextWithServerTransportStream(context.Background(), stream1)

sess1 := session.New(context.Background(), "ownerID", time.Minute, nil)
Expand All @@ -73,26 +73,13 @@ func TestSafetyMonitorForMetadata(t *testing.T) {
name1 := resource.NewName(resource.APINamespace("foo").WithType("bar").WithSubtype("baz"), "barf")
name2 := resource.NewName(resource.APINamespace("woo").WithType("war").WithSubtype("waz"), "warf")
session.SafetyMonitor(nextCtx, myThing{Named: name1.AsNamed()})
test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name1.String()})
test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name1.String()})
session.SafetyMonitor(nextCtx, myThing{Named: name2.AsNamed()})
test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name2.String()})
test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name2.String()})
}

type myThing struct {
resource.Named
resource.AlwaysRebuild
resource.TriviallyCloseable
}

type myStream struct {
mu sync.Mutex
grpc.ServerTransportStream
md metadata.MD
}

func (s *myStream) SetHeader(md metadata.MD) error {
s.mu.Lock()
defer s.mu.Unlock()
s.md = md.Copy()
return nil
}
34 changes: 34 additions & 0 deletions testutils/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package testutils

import (
"context"
"sync"

"go.viam.com/utils/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// TrackingDialer tracks dial attempts.
Expand Down Expand Up @@ -48,3 +50,35 @@ func (td *TrackingDialer) DialFunc(
}
return conn, cached, err
}

// ServerTransportStream implements grpc.ServerTransportStream and can be used to test setting
// metadata in the gRPC response header.
type ServerTransportStream struct {
mu sync.Mutex
grpc.ServerTransportStream
md metadata.MD
}

// NewServerTransportStream creates a new ServerTransportStream.
func NewServerTransportStream() *ServerTransportStream {
return &ServerTransportStream{
md: metadata.New(make(map[string]string)),
}
}

// SetHeader implements grpc.ServerTransportStream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] might be worth explaining how this is supposed to differ from SendHeader and why in this mock up the code is the same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good question, I actually meant to follow up on this because I wasn't sure myself, and I forgot. Turns out the different between SetHeader and SendHeader is that SetHeader is supposed to merge the new metadata fields with the existing metadata fields, whereas SendHeader overwrites the metadata and is only supposed to be called once. So I think there were two things that were slightly wrong here: our implementation of SetHeader, which completely overwrites the metadata rather than merging, and the fact that in replaypcd camera I'm calling SendHeader, which afaik will overwrite any metadata already set by SetHeader and error if anyone else tries to call SendHeader or SetHeader after that point. I've changed both; thanks for calling this out! cc @edaniels for awareness of this change (Jeremy's out so can't re-request his review)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

func (s *ServerTransportStream) SetHeader(md metadata.MD) error {
s.mu.Lock()
defer s.mu.Unlock()
for k, v := range md {
s.md[k] = v
}
return nil
}

// Value returns the value in the metadata map corresponding to a given key.
func (s *ServerTransportStream) Value(key string) []string {
s.mu.Lock()
defer s.mu.Unlock()
return s.md[key]
}
Loading