Skip to content

Commit

Permalink
remove command pattern from bulk upload segment users
Browse files Browse the repository at this point in the history
  • Loading branch information
hvn2k1 committed Jan 7, 2025
1 parent 06bed4f commit 9f679fe
Show file tree
Hide file tree
Showing 13 changed files with 1,373 additions and 668 deletions.
65 changes: 65 additions & 0 deletions api-description/web-api.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3368,6 +3368,55 @@ paths:
$ref: '#/definitions/featureCreateSegmentRequest'
tags:
- segment
/v1/segment_users/bulk_upload:
post:
summary: Bulk upload
description: Bulk upload segment users.
operationId: web.v1.segment_users.bulk_upload
responses:
"200":
description: A successful response.
schema:
$ref: '#/definitions/featureBulkUploadSegmentUsersResponse'
"400":
description: Returned for bad requests that may have failed validation.
schema:
$ref: '#/definitions/googlerpcStatus'
examples:
application/json:
code: 3
details: []
message: invalid arguments error
"401":
description: Request could not be authenticated (authentication required).
schema:
$ref: '#/definitions/googlerpcStatus'
examples:
application/json:
code: 16
details: []
message: not authenticated
"503":
description: Returned for internal errors.
schema:
$ref: '#/definitions/googlerpcStatus'
examples:
application/json:
code: 13
details: []
message: internal
default:
description: An unexpected error response.
schema:
$ref: '#/definitions/googlerpcStatus'
parameters:
- name: body
in: body
required: true
schema:
$ref: '#/definitions/featureBulkUploadSegmentUsersRequest'
tags:
- segment
/v1/segments:
get:
summary: List
Expand Down Expand Up @@ -5249,6 +5298,22 @@ definitions:
title: segment user ids separated by comma or new line
state:
$ref: '#/definitions/featureSegmentUserState'
featureBulkUploadSegmentUsersRequest:
type: object
properties:
segmentId:
type: string
command:
$ref: '#/definitions/featureBulkUploadSegmentUsersCommand'
description: deprecated
environmentId:
type: string
data:
type: string
format: byte
description: segment user ids separated by comma or new line
state:
$ref: '#/definitions/featureSegmentUserState'
featureBulkUploadSegmentUsersResponse:
type: object
featureChangeDescriptionCommand:
Expand Down
2 changes: 1 addition & 1 deletion manifests/bucketeer/charts/api/values.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion manifests/bucketeer/charts/web/values.yaml

Large diffs are not rendered by default.

135 changes: 134 additions & 1 deletion pkg/feature/api/segment_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ package api
import (
"bytes"
"context"
"errors"
"strconv"
"strings"

"github.com/jinzhu/copier"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

domainevent "github.com/bucketeer-io/bucketeer/pkg/domainevent/domain"
"github.com/bucketeer-io/bucketeer/pkg/feature/command"
"github.com/bucketeer-io/bucketeer/pkg/feature/domain"
v2fs "github.com/bucketeer-io/bucketeer/pkg/feature/storage/v2"
Expand Down Expand Up @@ -390,6 +393,9 @@ func (s *FeatureService) BulkUploadSegmentUsers(
if err != nil {
return nil, err
}
if req.Command == nil {
return s.bulkUploadSegmentUsersNoCommand(ctx, req, editor, localizer)
}
if err := validateBulkUploadSegmentUsersRequest(req, localizer); err != nil {
s.logger.Error(
"Invalid argument",
Expand Down Expand Up @@ -485,7 +491,134 @@ func (s *FeatureService) BulkUploadSegmentUsers(
)
})
if err != nil {
if err == v2fs.ErrSegmentNotFound || err == v2fs.ErrFeatureUnexpectedAffectedRows {
if errors.Is(err, v2fs.ErrSegmentNotFound) || errors.Is(err, v2fs.ErrFeatureUnexpectedAffectedRows) {
dt, err := statusSegmentNotFound.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.NotFoundError),
})
if err != nil {
return nil, statusInternal.Err()
}
return nil, dt.Err()
}
if status.Code(err) == codes.FailedPrecondition {
return nil, err
}
s.logger.Error(
"Failed to bulk upload segment users",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("environmentId", req.EnvironmentId),
)...,
)
dt, err := statusInternal.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return nil, statusInternal.Err()
}
return nil, dt.Err()
}
return &featureproto.BulkUploadSegmentUsersResponse{}, nil
}

func (s *FeatureService) bulkUploadSegmentUsersNoCommand(
ctx context.Context,
req *featureproto.BulkUploadSegmentUsersRequest,
editor *eventproto.Editor,
localizer locale.Localizer,
) (*featureproto.BulkUploadSegmentUsersResponse, error) {
if err := validateBulkUploadSegmentUsersNoCommandRequest(req, localizer); err != nil {
s.logger.Error(
"Invalid argument",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
zap.String("environmentId", req.EnvironmentId),
)...,
)
return nil, err
}
tx, err := s.mysqlClient.BeginTx(ctx)
if err != nil {
s.logger.Error(
"Failed to begin transaction",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
)...,
)
dt, err := statusInternal.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return nil, statusInternal.Err()
}
return nil, dt.Err()
}
err = s.mysqlClient.RunInTransaction(ctx, tx, func() error {
segmentStorage := v2fs.NewSegmentStorage(tx)
segment, _, err := segmentStorage.GetSegment(ctx, req.SegmentId, req.EnvironmentId)
if err != nil {
return err
}
if segment.IsInUseStatus {
dt, err := statusSegmentInUse.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.SegmentInUse),
})
if err != nil {
return statusInternal.Err()
}
return dt.Err()
}
if segment.Status == featureproto.Segment_UPLOADING {
dt, err := statusSegmentUsersAlreadyUploading.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.SegmentUsersAlreadyUploading),
})
if err != nil {
return statusInternal.Err()
}
return dt.Err()
}
prev := &domain.Segment{}
if err := copier.Copy(prev, segment); err != nil {
return err
}
segment.SetStatus(featureproto.Segment_UPLOADING)
if err := segmentStorage.UpdateSegment(ctx, segment, req.EnvironmentId); err != nil {
return err
}
e, err := domainevent.NewEvent(
editor,
eventproto.Event_SEGMENT,
segment.Id,
eventproto.Event_SEGMENT_BULK_UPLOAD_USERS,
&eventproto.SegmentBulkUploadUsersEvent{
SegmentId: segment.Id,
Status: featureproto.Segment_UPLOADING,
State: req.State,
},
req.EnvironmentId,
segment.Segment,
prev,
)
err = s.domainPublisher.Publish(ctx, e)
if err != nil {
return err
}
return s.publishBulkSegmentUsersReceivedEvent(
ctx,
editor,
req.EnvironmentId,
req.SegmentId,
req.Data,
req.State,
)
})
if err != nil {
if errors.Is(err, v2fs.ErrSegmentNotFound) || errors.Is(err, v2fs.ErrFeatureUnexpectedAffectedRows) {
dt, err := statusSegmentNotFound.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.NotFoundError),
Expand Down
139 changes: 130 additions & 9 deletions pkg/feature/api/segment_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,9 @@ func TestBulkUploadSegmentUsersMySQL(t *testing.T) {
setup: nil,
environmentId: "ns0",
segmentID: "",
cmd: nil,
cmd: &featureproto.BulkUploadSegmentUsersCommand{},
expectedErr: createError(statusMissingSegmentID, localizer.MustLocalizeWithTemplate(locale.RequiredFieldTemplate, "segment_id")),
},
{
desc: "ErrMissingCommand",
setup: nil,
environmentId: "ns0",
segmentID: "id",
cmd: nil,
expectedErr: createError(statusMissingCommand, localizer.MustLocalizeWithTemplate(locale.InvalidArgumentError, "command")),
},
{
desc: "ErrMissingSegmentUsersData",
setup: nil,
Expand Down Expand Up @@ -174,6 +166,135 @@ func TestBulkUploadSegmentUsersMySQL(t *testing.T) {
}
}

func TestBulkUploadSegmentUsersNoCommandMySQL(t *testing.T) {
t.Parallel()
mockController := gomock.NewController(t)
defer mockController.Finish()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctx = metadata.NewIncomingContext(ctx, metadata.MD{
"accept-language": []string{"ja"},
})
localizer := locale.NewLocalizer(ctx)
createError := func(status *gstatus.Status, msg string) error {
st, err := status.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: msg,
})
require.NoError(t, err)
return st.Err()
}

testcases := []struct {
desc string
setup func(*FeatureService)
req *featureproto.BulkUploadSegmentUsersRequest
environmentId string
expectedErr error
}{
{
desc: "ErrMissingSegmentID",
setup: nil,
req: &featureproto.BulkUploadSegmentUsersRequest{
EnvironmentId: "ns0",
SegmentId: "",
},
expectedErr: createError(statusMissingSegmentID, localizer.MustLocalizeWithTemplate(locale.RequiredFieldTemplate, "segment_id")),
},
{
desc: "ErrMissingSegmentUsersData",
setup: nil,
req: &featureproto.BulkUploadSegmentUsersRequest{
EnvironmentId: "ns0",
SegmentId: "id",
},
expectedErr: createError(statusMissingSegmentUsersData, localizer.MustLocalizeWithTemplate(locale.RequiredFieldTemplate, "user_data")),
},
{
desc: "ErrExceededMaxSegmentUsersDataSize",
setup: nil,
req: &featureproto.BulkUploadSegmentUsersRequest{
Data: []byte(strings.Repeat("a", maxSegmentUsersDataSize+1)),
EnvironmentId: "ns0",
SegmentId: "id",
},
expectedErr: createError(statusExceededMaxSegmentUsersDataSize, localizer.MustLocalizeWithTemplate(locale.InvalidArgumentError, "user_data_state")),
},
{
desc: "ErrUnknownSegmentUserState",
setup: nil,
req: &featureproto.BulkUploadSegmentUsersRequest{
Data: []byte("data"),
State: featureproto.SegmentUser_State(99),
EnvironmentId: "ns0",
SegmentId: "id",
},
expectedErr: createError(statusUnknownSegmentUserState, localizer.MustLocalizeWithTemplate(locale.InvalidArgumentError, "user_state")),
},
{
desc: "ErrSegmentNotFound",
setup: func(s *FeatureService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
).Return(v2fs.ErrSegmentNotFound)
},
req: &featureproto.BulkUploadSegmentUsersRequest{
Data: []byte("data"),
State: featureproto.SegmentUser_INCLUDED,
EnvironmentId: "ns0",
SegmentId: "not_found_id",
},
expectedErr: createError(statusSegmentNotFound, localizer.MustLocalize(locale.NotFoundError)),
},
{
desc: "ErrSegmentUsersAlreadyUploading",
setup: func(s *FeatureService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
).Return(createError(statusSegmentUsersAlreadyUploading, localizer.MustLocalize(locale.SegmentUsersAlreadyUploading)))
},
req: &featureproto.BulkUploadSegmentUsersRequest{
Data: []byte("data"),
State: featureproto.SegmentUser_INCLUDED,
EnvironmentId: "ns0",
SegmentId: "id",
},
expectedErr: createError(statusSegmentUsersAlreadyUploading, localizer.MustLocalize(locale.SegmentUsersAlreadyUploading)),
},
{
desc: "Success",
setup: func(s *FeatureService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
).Return(nil)
},
req: &featureproto.BulkUploadSegmentUsersRequest{
Data: []byte("data"),
State: featureproto.SegmentUser_INCLUDED,
EnvironmentId: "ns0",
SegmentId: "id",
},
expectedErr: nil,
},
}

for _, tc := range testcases {
t.Run(tc.desc, func(t *testing.T) {
service := createFeatureService(mockController)
if tc.setup != nil {
tc.setup(service)
}
ctx = setToken(ctx)
_, err := service.BulkUploadSegmentUsers(ctx, tc.req)
assert.Equal(t, tc.expectedErr, err)
})
}
}

func TestBulkDownloadSegmentUsersMySQL(t *testing.T) {
t.Parallel()
mockController := gomock.NewController(t)
Expand Down
Loading

0 comments on commit 9f679fe

Please sign in to comment.