-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RSDK-2880: Add timestamp information to the replay camera #2371
Changes from all commits
cedd896
248837f
84e25de
a97a073
ec03d78
28f4d53
8529267
0c35618
c1b4367
0f6910f
0727bfb
61f8c96
318bcba
0673ab0
985b11c
0a194d2
8f5cc96
1e7c876
3f6ac99
1e1852c
7c0ed22
4073b64
a78a21a
2af5ec8
5f9382d
c3b6b7d
18420da
61c8d69
8d57f0c
ac80ba4
b09c792
1a76a6d
f8b58fd
bdb39a5
68a0b63
d51bd9e
58f2259
5f65cfb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,13 +11,15 @@ import ( | |
"path/filepath" | ||
"strconv" | ||
"testing" | ||
"time" | ||
|
||
"github.com/edaniels/golog" | ||
"github.com/pkg/errors" | ||
datapb "go.viam.com/api/app/data/v1" | ||
"go.viam.com/test" | ||
"go.viam.com/utils/artifact" | ||
"go.viam.com/utils/rpc" | ||
"google.golang.org/protobuf/types/known/timestamppb" | ||
|
||
"go.viam.com/rdk/components/camera" | ||
viamgrpc "go.viam.com/rdk/grpc" | ||
|
@@ -28,6 +30,8 @@ import ( | |
"go.viam.com/rdk/testutils/inject" | ||
) | ||
|
||
const testTime = "2000-01-01T12:00:%02dZ" | ||
|
||
// mockDataServiceServer is a struct that includes unimplemented versions of all the Data Service endpoints. These | ||
// can be overwritten to allow developers to trigger desired behaviors during testing. | ||
type mockDataServiceServer struct { | ||
|
@@ -60,9 +64,17 @@ func (mDServer *mockDataServiceServer) BinaryDataByFilter(ctx context.Context, r | |
gz.Close() | ||
|
||
// Construct response | ||
timeReq, err := time.Parse(time.RFC3339, fmt.Sprintf(testTime, newFileNum)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [q] When should/do we need to use time.PFC3339 and time.RFC339Nano? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now, in production code paths, we're using RFC3339 for parsing the requested time ranges from the config for the replay sensor. This means that we're limiting "users" (Cloud Story) from configuring anything here with higher granularity than 1 second. This matches what we do in the Data tab and I think this is ok! We then use RFC3339Nano whenever we're stringifying or parsing one of the timestamps correlated with the images. These have nanosecond granularity so we need to use a time format that supports that so we don't lose that information. Here in the tests I'm using RFC3339 because I'm only changing the second of the timestamp depending on the file, so I hard-coded something in the RFC3339 format. |
||
if err != nil { | ||
return nil, errors.Wrap(err, "failed parsing time") | ||
} | ||
timeRec := timeReq.Add(time.Second) | ||
binaryData := &datapb.BinaryData{ | ||
Binary: dataBuf.Bytes(), | ||
Metadata: &datapb.BinaryMetadata{}, | ||
Binary: dataBuf.Bytes(), | ||
Metadata: &datapb.BinaryMetadata{ | ||
TimeRequested: timestamppb.New(timeReq), | ||
TimeReceived: timestamppb.New(timeRec), | ||
}, | ||
} | ||
|
||
resp := &datapb.BinaryDataByFilterResponse{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,10 +10,10 @@ import ( | |
"github.com/google/uuid" | ||
"go.viam.com/test" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/metadata" | ||
|
||
"go.viam.com/rdk/resource" | ||
"go.viam.com/rdk/session" | ||
"go.viam.com/rdk/testutils" | ||
) | ||
|
||
func TestToFromContext(t *testing.T) { | ||
|
@@ -64,7 +64,7 @@ func TestSafetyMonitor(t *testing.T) { | |
} | ||
|
||
func TestSafetyMonitorForMetadata(t *testing.T) { | ||
stream1 := &myStream{} | ||
stream1 := testutils.NewServerTransportStream() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
streamCtx := grpc.NewContextWithServerTransportStream(context.Background(), stream1) | ||
|
||
sess1 := session.New(context.Background(), "ownerID", time.Minute, nil) | ||
|
@@ -73,26 +73,13 @@ func TestSafetyMonitorForMetadata(t *testing.T) { | |
name1 := resource.NewName(resource.APINamespace("foo").WithType("bar").WithSubtype("baz"), "barf") | ||
name2 := resource.NewName(resource.APINamespace("woo").WithType("war").WithSubtype("waz"), "warf") | ||
session.SafetyMonitor(nextCtx, myThing{Named: name1.AsNamed()}) | ||
test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name1.String()}) | ||
test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name1.String()}) | ||
session.SafetyMonitor(nextCtx, myThing{Named: name2.AsNamed()}) | ||
test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name2.String()}) | ||
test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name2.String()}) | ||
} | ||
|
||
type myThing struct { | ||
resource.Named | ||
resource.AlwaysRebuild | ||
resource.TriviallyCloseable | ||
} | ||
|
||
type myStream struct { | ||
mu sync.Mutex | ||
grpc.ServerTransportStream | ||
md metadata.MD | ||
} | ||
|
||
func (s *myStream) SetHeader(md metadata.MD) error { | ||
s.mu.Lock() | ||
defer s.mu.Unlock() | ||
s.md = md.Copy() | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,9 +3,11 @@ package testutils | |
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"go.viam.com/utils/rpc" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/metadata" | ||
) | ||
|
||
// TrackingDialer tracks dial attempts. | ||
|
@@ -48,3 +50,35 @@ func (td *TrackingDialer) DialFunc( | |
} | ||
return conn, cached, err | ||
} | ||
|
||
// ServerTransportStream implements grpc.ServerTransportStream and can be used to test setting | ||
// metadata in the gRPC response header. | ||
type ServerTransportStream struct { | ||
mu sync.Mutex | ||
grpc.ServerTransportStream | ||
md metadata.MD | ||
} | ||
|
||
// NewServerTransportStream creates a new ServerTransportStream. | ||
func NewServerTransportStream() *ServerTransportStream { | ||
return &ServerTransportStream{ | ||
md: metadata.New(make(map[string]string)), | ||
} | ||
} | ||
|
||
// SetHeader implements grpc.ServerTransportStream. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] might be worth explaining how this is supposed to differ from SendHeader and why in this mock up the code is the same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very good question, I actually meant to follow up on this because I wasn't sure myself, and I forgot. Turns out the different between SetHeader and SendHeader is that SetHeader is supposed to merge the new metadata fields with the existing metadata fields, whereas SendHeader overwrites the metadata and is only supposed to be called once. So I think there were two things that were slightly wrong here: our implementation of SetHeader, which completely overwrites the metadata rather than merging, and the fact that in replaypcd camera I'm calling SendHeader, which afaik will overwrite any metadata already set by SetHeader and error if anyone else tries to call SendHeader or SetHeader after that point. I've changed both; thanks for calling this out! cc @edaniels for awareness of this change (Jeremy's out so can't re-request his review) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch |
||
func (s *ServerTransportStream) SetHeader(md metadata.MD) error { | ||
s.mu.Lock() | ||
defer s.mu.Unlock() | ||
for k, v := range md { | ||
s.md[k] = v | ||
} | ||
return nil | ||
} | ||
|
||
// Value returns the value in the metadata map corresponding to a given key. | ||
func (s *ServerTransportStream) Value(key string) []string { | ||
s.mu.Lock() | ||
defer s.mu.Unlock() | ||
return s.md[key] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[q] Does the metadata context need to be created every time? If not we could move lines 443-444 to 435 and call:
ctx = grpc.NewContextWithServerTransportStream(context.Background(), serverStream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't need to be, but I felt this more closely mimicked how this works in prod with consecutive gRPC calls using separate ServerTransportStreams and gRPC contexts. So I'm going to leave this as-is unless someone else feels strongly that this should change.