Skip to content

Commit

Permalink
Fix index job logic to pass DNS A record (#2438)
Browse files Browse the repository at this point in the history
* fix: choose user given target address in priority

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: deleted unnecessary code

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: add logic to connect the user given target addresses

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: add info log about new connection target

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: indent of configuration data

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: indent bug

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: deepsource warning

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

---------

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>
  • Loading branch information
hlts2 authored and vdaas-ci committed Mar 12, 2024
1 parent 05a8c2f commit 2742379
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 124 deletions.
7 changes: 6 additions & 1 deletion charts/vald/templates/index/job/creation/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ data:
agent_namespace: {{ $creator.agent_namespace | quote }}
node_name: {{ $creator.node_name | quote }}
concurrency: {{ $creator.concurrency }}
target_addrs: {{ $creator.target_addrs }}
{{- if $creator.target_addrs }}
target_addrs:
{{- toYaml $creator.target_addrs | nindent 8 }}
{{- else }}
target_addrs: []
{{- end }}
discoverer:
duration: {{ $creator.discoverer.duration }}
client:
Expand Down
7 changes: 6 additions & 1 deletion charts/vald/templates/index/job/save/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ data:
agent_namespace: {{ $saver.agent_namespace | quote }}
node_name: {{ $saver.node_name | quote }}
concurrency: {{ $saver.concurrency }}
target_addrs: {{ $saver.target_addrs }}
{{- if $saver.target_addrs }}
target_addrs:
{{- toYaml $saver.target_addrs | nindent 8 }}
{{- else }}
target_addrs: []
{{- end }}
discoverer:
duration: {{ $saver.discoverer.duration }}
client:
Expand Down
49 changes: 24 additions & 25 deletions pkg/index/job/creation/service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ type Indexer interface {
}

type index struct {
client discoverer.Client
targetAddrs []string
targetAddrList map[string]bool
client discoverer.Client
targetAddrs []string

creationPoolSize uint32
concurrency int
Expand All @@ -64,13 +63,23 @@ func New(opts ...Option) (Indexer, error) {
log.Warn(oerr)
}
}
idx.targetAddrList = make(map[string]bool, len(idx.targetAddrs))
for _, addr := range idx.targetAddrs {
idx.targetAddrList[addr] = true
}
idx.targetAddrs = delDuplicateAddrs(idx.targetAddrs)

Check warning on line 66 in pkg/index/job/creation/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/creation/service/indexer.go#L66

Added line #L66 was not covered by tests
return idx, nil
}

func delDuplicateAddrs(targetAddrs []string) []string {
addrs := make([]string, 0, len(targetAddrs))
exist := make(map[string]bool)

for _, addr := range targetAddrs {
if !exist[addr] {
addrs = append(addrs, addr)
exist[addr] = true
}

Check warning on line 78 in pkg/index/job/creation/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/creation/service/indexer.go#L70-L78

Added lines #L70 - L78 were not covered by tests
}
return addrs

Check warning on line 80 in pkg/index/job/creation/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/creation/service/indexer.go#L80

Added line #L80 was not covered by tests
}

// StartClient starts the gRPC client.
func (idx *index) StartClient(ctx context.Context) (<-chan error, error) {
return idx.client.Start(ctx)
Expand Down Expand Up @@ -126,6 +135,7 @@ func (idx *index) Start(ctx context.Context) error {
return nil
}

// skipcq: GO-R1005
func (idx *index) doCreateIndex(ctx context.Context, fn func(_ context.Context, _ agent.AgentClient, _ ...grpc.CallOption) (*payload.Empty, error)) (errs error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doCreateIndex")
defer func() {
Expand All @@ -136,12 +146,14 @@ func (idx *index) doCreateIndex(ctx context.Context, fn func(_ context.Context,

targetAddrs := idx.client.GetAddrs(ctx)
if len(idx.targetAddrs) != 0 {
targetAddrs = idx.extractTargetAddrs(targetAddrs)

// If targetAddrs is empty, an invalid target addresses may be registered in targetAddrList.
if len(targetAddrs) == 0 {
return errors.ErrGRPCTargetAddrNotFound
// If target addresses is specified, that addresses are used in priority.
for _, addr := range idx.targetAddrs {
log.Infof("connect to target agent (%s)", addr)
if _, err := idx.client.GetClient().Connect(ctx, addr); err != nil {
return err
}

Check warning on line 154 in pkg/index/job/creation/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/creation/service/indexer.go#L153-L154

Added lines #L153 - L154 were not covered by tests
}
targetAddrs = idx.targetAddrs
}
log.Infof("target agent addrs: %v", targetAddrs)

Expand Down Expand Up @@ -207,16 +219,3 @@ func (idx *index) doCreateIndex(ctx context.Context, fn func(_ context.Context,
)
return errors.Join(err, errs)
}

// extractTargetAddresses filters and extracts target addresses registered in targetAddrList from the given address list.
func (idx *index) extractTargetAddrs(addrs []string) []string {
res := make([]string, 0, len(addrs))
for _, addr := range addrs {
if !idx.targetAddrList[addr] {
log.Warnf("the gRPC target address not found: %s", addr)
} else {
res = append(res, addr)
}
}
return res
}
62 changes: 30 additions & 32 deletions pkg/index/job/creation/service/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/pool"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/test/goleak"
clientmock "github.com/vdaas/vald/internal/test/mock/client"
Expand All @@ -36,7 +37,6 @@ func Test_index_Start(t *testing.T) {
type fields struct {
client discoverer.Client
targetAddrs []string
targetAddrList map[string]bool
creationPoolSize uint32
concurrency int
}
Expand Down Expand Up @@ -68,7 +68,6 @@ func Test_index_Start(t *testing.T) {
args: args{
ctx: context.Background(),
},

fields: fields{
client: &clientmock.DiscovererClientMock{
GetAddrsFunc: func(_ context.Context) []string {
Expand All @@ -88,49 +87,46 @@ func Test_index_Start(t *testing.T) {
}
}(),
func() test {
addrs := []string{
"127.0.0.1:8080",
}
return test{
name: "Fail: when there is an error wrapped with gRPC status in the indexing request process",
name: "Success: when a target addresses (targetAddrs) is given and there are no errors in the indexing request process",
args: args{
ctx: context.Background(),
},
fields: fields{
targetAddrs: []string{
"127.0.0.1:8080",
"127.0.0.1:8081",
"127.0.0.1:8083",
},
client: &clientmock.DiscovererClientMock{
GetAddrsFunc: func(_ context.Context) []string {
return addrs
return nil
},
GetClientFunc: func() grpc.Client {
return &grpcmock.GRPCClientMock{
OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int,
_ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error,
) error {
return status.WrapWithInternal(
agent.CreateIndexRPCName+" API connection not found",
errors.ErrGRPCClientConnNotFound("*"),
)
return nil
},
ConnectFunc: func(_ context.Context, _ string, _ ...grpc.DialOption) (pool.Conn, error) {
return nil, nil
},
}
},
},
},
want: want{
err: status.Error(codes.Internal,
agent.CreateIndexRPCName+" API connection not found"),
},
}
}(),
func() test {
addrs := []string{
"127.0.0.1:8080",
}
return test{
name: "Fail: When the OrderedRangeConcurrent method returns a gRPC client conn not found error",
name: "Fail: when there is an error wrapped with gRPC status in the indexing request process",
args: args{
ctx: context.Background(),
},

fields: fields{
client: &clientmock.DiscovererClientMock{
GetAddrsFunc: func(_ context.Context) []string {
Expand All @@ -141,7 +137,10 @@ func Test_index_Start(t *testing.T) {
OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int,
_ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error,
) error {
return errors.ErrGRPCClientConnNotFound("*")
return status.WrapWithInternal(
agent.CreateIndexRPCName+" API connection not found",
errors.ErrGRPCClientConnNotFound("*"),
)
},
}
},
Expand All @@ -154,29 +153,33 @@ func Test_index_Start(t *testing.T) {
}
}(),
func() test {
targetAddrs := []string{
addrs := []string{
"127.0.0.1:8080",
}
targetAddrList := map[string]bool{
targetAddrs[0]: true,
}
return test{
name: "Fail: when there is no address matching targetAddrList",
name: "Fail: When the OrderedRangeConcurrent method returns a gRPC client conn not found error",
args: args{
ctx: context.Background(),
},
fields: fields{
client: &clientmock.DiscovererClientMock{
GetAddrsFunc: func(_ context.Context) []string {
return nil
return addrs
},
GetClientFunc: func() grpc.Client {
return &grpcmock.GRPCClientMock{
OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int,
_ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error,
) error {
return errors.ErrGRPCClientConnNotFound("*")
},
}
},
},
targetAddrs: targetAddrs,
targetAddrList: targetAddrList,
},
want: want{
err: status.Error(codes.Internal,
agent.CreateIndexRPCName+" API connection target address \"127.0.0.1:8080\" not found"),
agent.CreateIndexRPCName+" API connection not found"),
},
}
}(),
Expand All @@ -200,7 +203,6 @@ func Test_index_Start(t *testing.T) {
idx := &index{
client: test.fields.client,
targetAddrs: test.fields.targetAddrs,
targetAddrList: test.fields.targetAddrList,
creationPoolSize: test.fields.creationPoolSize,
concurrency: test.fields.concurrency,
}
Expand Down Expand Up @@ -312,7 +314,6 @@ func Test_index_Start(t *testing.T) {
// type fields struct {
// client discoverer.Client
// targetAddrs []string
// targetAddrList map[string]bool
// creationPoolSize uint32
// concurrency int
// }
Expand Down Expand Up @@ -349,7 +350,6 @@ func Test_index_Start(t *testing.T) {
// fields: fields {
// client:nil,
// targetAddrs:nil,
// targetAddrList:nil,
// creationPoolSize:0,
// concurrency:0,
// },
Expand All @@ -375,7 +375,6 @@ func Test_index_Start(t *testing.T) {
// fields: fields {
// client:nil,
// targetAddrs:nil,
// targetAddrList:nil,
// creationPoolSize:0,
// concurrency:0,
// },
Expand Down Expand Up @@ -410,7 +409,6 @@ func Test_index_Start(t *testing.T) {
// idx := &index{
// client: test.fields.client,
// targetAddrs: test.fields.targetAddrs,
// targetAddrList: test.fields.targetAddrList,
// creationPoolSize: test.fields.creationPoolSize,
// concurrency: test.fields.concurrency,
// }
Expand Down
50 changes: 24 additions & 26 deletions pkg/index/job/save/service/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,15 @@ type Indexer interface {
}

type index struct {
client discoverer.Client
targetAddrs []string
targetAddrList map[string]bool
client discoverer.Client
targetAddrs []string

concurrency int
}

// New returns Indexer object if no error occurs.
func New(opts ...Option) (Indexer, error) {
idx := new(index)
idx := &index{}

Check warning on line 53 in pkg/index/job/save/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/save/service/indexer.go#L53

Added line #L53 was not covered by tests
for _, opt := range append(defaultOpts, opts...) {
if err := opt(idx); err != nil {
oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt))
Expand All @@ -63,13 +62,23 @@ func New(opts ...Option) (Indexer, error) {
log.Warn(oerr)
}
}
idx.targetAddrList = make(map[string]bool, len(idx.targetAddrs))
for _, addr := range idx.targetAddrs {
idx.targetAddrList[addr] = true
}
idx.targetAddrs = delDuplicateAddrs(idx.targetAddrs)

Check warning on line 65 in pkg/index/job/save/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/save/service/indexer.go#L65

Added line #L65 was not covered by tests
return idx, nil
}

func delDuplicateAddrs(targetAddrs []string) []string {
addrs := make([]string, 0, len(targetAddrs))
exist := make(map[string]bool)

for _, addr := range targetAddrs {
if !exist[addr] {
addrs = append(addrs, addr)
exist[addr] = true
}

Check warning on line 77 in pkg/index/job/save/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/save/service/indexer.go#L69-L77

Added lines #L69 - L77 were not covered by tests
}
return addrs

Check warning on line 79 in pkg/index/job/save/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/save/service/indexer.go#L79

Added line #L79 was not covered by tests
}

// StartClient starts the gRPC client.
func (idx *index) StartClient(ctx context.Context) (<-chan error, error) {
return idx.client.Start(ctx)
Expand Down Expand Up @@ -123,6 +132,7 @@ func (idx *index) Start(ctx context.Context) error {
return nil
}

// skipcq: GO-R1005
func (idx *index) doSaveIndex(ctx context.Context, fn func(_ context.Context, _ agent.AgentClient, _ ...grpc.CallOption) (*payload.Empty, error)) (errs error) {
ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doSaveIndex")
defer func() {
Expand All @@ -133,12 +143,13 @@ func (idx *index) doSaveIndex(ctx context.Context, fn func(_ context.Context, _

targetAddrs := idx.client.GetAddrs(ctx)
if len(idx.targetAddrs) != 0 {
targetAddrs = idx.extractTargetAddrs(targetAddrs)

// If targetAddrs is empty, an invalid target addresses may be registered in targetAddrList.
if len(targetAddrs) == 0 {
return errors.ErrGRPCTargetAddrNotFound
// If target addresses is specified, that addresses are used in priority.
for _, addr := range idx.targetAddrs {
if _, err := idx.client.GetClient().Connect(ctx, addr); err != nil {
return err
}

Check warning on line 150 in pkg/index/job/save/service/indexer.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/job/save/service/indexer.go#L149-L150

Added lines #L149 - L150 were not covered by tests
}
targetAddrs = idx.targetAddrs
}
log.Infof("target agent addrs: %v", targetAddrs)

Expand Down Expand Up @@ -200,16 +211,3 @@ func (idx *index) doSaveIndex(ctx context.Context, fn func(_ context.Context, _
)
return errors.Join(err, errs)
}

// extractTargetAddresses filters and extracts target addresses registered in targetAddrList from the given address list.
func (idx *index) extractTargetAddrs(addrs []string) []string {
res := make([]string, 0, len(addrs))
for _, addr := range addrs {
if !idx.targetAddrList[addr] {
log.Warnf("the gRPC target address not found: %s", addr)
} else {
res = append(res, addr)
}
}
return res
}
Loading

0 comments on commit 2742379

Please sign in to comment.