Skip to content

Commit

Permalink
Merge pull request #1 from KeranYang/mapT
Browse files Browse the repository at this point in the history
Map t
  • Loading branch information
KeranYang authored Jan 18, 2023
2 parents b89bb93 + 8c3c3fb commit 684d92b
Show file tree
Hide file tree
Showing 23 changed files with 674 additions and 117 deletions.
20 changes: 20 additions & 0 deletions pkg/apis/proto/function/v1/funcmock/funcmock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 30 additions & 25 deletions pkg/apis/proto/function/v1/udfunction.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions pkg/apis/proto/function/v1/udfunction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import "google/protobuf/empty.proto";
package function.v1;

service UserDefinedFunction {
// Applies a function to each datum element.
// MapFn applies a function to each datum element without modifying event time.
rpc MapFn(Datum) returns (DatumList);

// Applies a reduce function to a datum stream.
// MapTFn applies a function to each datum element.
// In addition to map function, MapTFn also supports assigning a new event time to datum.
rpc MapTFn(Datum) returns (DatumList);

// ReduceFn applies a reduce function to a datum stream.
rpc ReduceFn(stream Datum) returns (DatumList);

// IsReady is the heartbeat endpoint for gRPC.
Expand Down
50 changes: 45 additions & 5 deletions pkg/apis/proto/function/v1/udfunction_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions pkg/apis/proto/sink/v1/udsink.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sink/v1/udsink_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion pkg/function/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *client) IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) {
return resp.GetReady(), nil
}

// MapFn applies a function to each datum element.
// MapFn applies a function to each datum element without modifying event time.
func (c *client) MapFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error) {
mappedDatumList, err := c.grpcClt.MapFn(ctx, datum)
if err != nil {
Expand All @@ -63,6 +63,17 @@ func (c *client) MapFn(ctx context.Context, datum *functionpb.Datum) ([]*functio
return mappedDatumList.GetElements(), nil
}

// MapTFn applies a function to each datum element.
// In addition to map function, MapTFn also supports assigning a new event time to datum.
func (c *client) MapTFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error) {
mappedDatumList, err := c.grpcClt.MapTFn(ctx, datum)
if err != nil {
return nil, fmt.Errorf("failed to execute c.grpcClt.MapTFn(): %w", err)
}

return mappedDatumList.GetElements(), nil
}

// ReduceFn applies a reduce function to a datum stream.
func (c *client) ReduceFn(ctx context.Context, datumStreamCh <-chan *functionpb.Datum) ([]*functionpb.Datum, error) {
stream, err := c.grpcClt.ReduceFn(ctx)
Expand Down
37 changes: 37 additions & 0 deletions pkg/function/clienttest/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,40 @@ func TestMapFn(t *testing.T) {
assert.Nil(t, got)
assert.EqualError(t, err, "failed to execute c.grpcClt.MapFn(): mock MapFn error")
}

func TestMapTFn(t *testing.T) {
var ctx = context.Background()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockClient := funcmock.NewMockUserDefinedFunctionClient(ctrl)
testDatum := &functionpb.Datum{
Key: "test_success_key",
Value: []byte(`forward_message`),
EventTime: &functionpb.EventTime{EventTime: timestamppb.New(time.Unix(1661169600, 0))},
Watermark: &functionpb.Watermark{Watermark: timestamppb.New(time.Time{})},
}
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: testDatum}).Return(&functionpb.DatumList{
Elements: []*functionpb.Datum{
testDatum,
},
}, nil)
mockClient.EXPECT().MapTFn(gomock.Any(), &rpcMsg{msg: testDatum}).Return(&functionpb.DatumList{
Elements: []*functionpb.Datum{
nil,
},
}, fmt.Errorf("mock MapTFn error"))
testClient, err := New(mockClient)
assert.NoError(t, err)
reflect.DeepEqual(testClient, &client{
grpcClt: mockClient,
})
got, err := testClient.MapTFn(ctx, testDatum)
reflect.DeepEqual(got, testDatum)
assert.NoError(t, err)

got, err = testClient.MapTFn(ctx, testDatum)
assert.Nil(t, got)
assert.EqualError(t, err, "failed to execute c.grpcClt.MapTFn(): mock MapTFn error")
}
13 changes: 12 additions & 1 deletion pkg/function/clienttest/clienttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (c *client) IsReady(ctx context.Context, in *emptypb.Empty) (bool, error) {
return resp.GetReady(), nil
}

// MapFn applies a function to each datum element.
// MapFn applies a function to each datum element without modifying event time.
func (c *client) MapFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error) {
mappedDatumList, err := c.grpcClt.MapFn(ctx, datum)
if err != nil {
Expand All @@ -43,6 +43,17 @@ func (c *client) MapFn(ctx context.Context, datum *functionpb.Datum) ([]*functio
return mappedDatumList.GetElements(), nil
}

// MapTFn applies a function to each datum element.
// In addition to map function, MapTFn also supports assigning a new event time to datum.
func (c *client) MapTFn(ctx context.Context, datum *functionpb.Datum) ([]*functionpb.Datum, error) {
mappedDatumList, err := c.grpcClt.MapTFn(ctx, datum)
if err != nil {
return nil, fmt.Errorf("failed to execute c.grpcClt.MapTFn(): %w", err)
}

return mappedDatumList.GetElements(), nil
}

// ReduceFn applies a reduce function to a datum stream.
func (c *client) ReduceFn(ctx context.Context, datumStreamCh <-chan *functionpb.Datum) ([]*functionpb.Datum, error) {
stream, err := c.grpcClt.ReduceFn(ctx)
Expand Down
Loading

0 comments on commit 684d92b

Please sign in to comment.