Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ykadowak committed Mar 6, 2024
1 parent b5bee03 commit f24440e
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 17 deletions.
2 changes: 2 additions & 0 deletions charts/vald/templates/index/operator/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,6 @@ data:
agent_name: {{ $agent.name }}
agent_namespace: {{ $agent.namespace }}
concurrency: 1
read_replica_enabled: {{ $agent.readreplica.enabled }}
read_replica_label_key: {{ $agent.readreplica.label_key }}
{{- end }}
2 changes: 2 additions & 0 deletions cmd/index/operator/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ operator:
agent_name: "vald-agent"
agent_namespace: "default"
concurrency: 1
read_replica_enabled: true
read_replica_label_key: "vald-readreplica-id"
observability:
enabled: false
otlp:
Expand Down
6 changes: 6 additions & 0 deletions internal/config/index_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ type IndexOperator struct {

// Concurrency represents indexing concurrency.
Concurrency int `json:"concurrency" yaml:"concurrency"`

// ReadReplicaEnabled represents whether read replica is enabled or not.
ReadReplicaEnabled bool `json:"read_replica_enabled" yaml:"read_replica_enabled"`

// ReadReplicaLabelKey represents the label key for read replica.
ReadReplicaLabelKey string `json:"read_replica_label_key" yaml:"read_replica_label_key"`
}

func (ic *IndexOperator) Bind() *IndexOperator {
Expand Down
43 changes: 27 additions & 16 deletions pkg/index/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/k8s"
"github.com/vdaas/vald/internal/k8s/client"
"github.com/vdaas/vald/internal/k8s/job"
"github.com/vdaas/vald/internal/k8s/pod"
"github.com/vdaas/vald/internal/k8s/vald"
Expand All @@ -44,9 +45,12 @@ type Operator interface {
}

type operator struct {
ctrl k8s.Controller
eg errgroup.Group
namespace string
ctrl k8s.Controller
eg errgroup.Group
namespace string
client client.Client
readReplicaEnabled bool
readReplicaLabelKey string
}

// New returns Indexer object if no error occurs.
Expand Down Expand Up @@ -95,6 +99,13 @@ func New(agentName string, opts ...Option) (o Operator, err error) {
if err != nil {
return nil, err
}

client, err := client.New()
if err != nil {
return nil, err
}
operator.client = client

Check warning on line 108 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L103-L108

Added lines #L103 - L108 were not covered by tests
return operator, nil
}

Expand Down Expand Up @@ -131,29 +142,30 @@ func (o *operator) Start(ctx context.Context) (<-chan error, error) {

// TODO: implement agent pod reconcile logic to detect conditions to start indexing and saving.
func (o *operator) podOnReconcile(ctx context.Context, podList map[string][]pod.Pod) {
client := o.ctrl.GetManager().GetClient()
for k, v := range podList {
for _, pod := range v {
log.Debug("key", k, "name:", pod.Name, "annotations:", pod.Annotations)

// rotate read replica if needed
if err := o.rotateIfNeeded(ctx, client, pod); err != nil {
log.Error(err)
if o.readReplicaEnabled {
if err := o.rotateIfNeeded(ctx, pod); err != nil {
log.Error(err)
}

Check warning on line 153 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L148-L153

Added lines #L148 - L153 were not covered by tests
}
}
}
}

// TODO: implement job reconcile logic to detect save job completion and to start rotation.
func (o *operator) jobOnReconcile(ctx context.Context, jobList map[string][]job.Job) {
for k, v := range jobList {
for _, job := range v {
log.Debug("key", k, "name:", job.Name, "status:", job.Status)
}
}
// for k, v := range jobList {
// for _, job := range v {
// log.Debug("key", k, "name:", job.Name, "status:", job.Status)
// }
// }

Check warning on line 165 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L161-L165

Added lines #L161 - L165 were not covered by tests
}

func (o *operator) rotateIfNeeded(ctx context.Context, client crclient.Client, pod pod.Pod) error {
func (o *operator) rotateIfNeeded(ctx context.Context, pod pod.Pod) error {
t, ok := pod.Annotations[vald.LastTimeSaveIndexTimestampAnnotationsKey]
if !ok {
log.Info("the agent pod has not saved index yet. skipping...")
Expand All @@ -169,11 +181,10 @@ func (o *operator) rotateIfNeeded(ctx context.Context, client crclient.Client, p
log.Info("no index label found. the agent is not StatefulSet? skipping...")
return nil
}

Check warning on line 183 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L179-L183

Added lines #L179 - L183 were not covered by tests
// FIXME: get the key from config
label := crclient.MatchingLabels(map[string]string{"vald-readreplica-id": podIdx})

var depList appsv1.DeploymentList
if err := client.List(ctx, &depList, label); err != nil {
label := crclient.MatchingLabels(map[string]string{o.readReplicaLabelKey: podIdx})
if err := o.client.List(ctx, &depList, label); err != nil {
return err
}
if len(depList.Items) == 0 {
Expand All @@ -194,7 +205,7 @@ func (o *operator) rotateIfNeeded(ctx context.Context, client crclient.Client, p
return nil

Check warning on line 205 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L203-L205

Added lines #L203 - L205 were not covered by tests
}
}
log.Info("rotation required. creating rotator job...")
log.Infof("rotation required for agent id: %s. creating rotator job...", podIdx)
// TODO: check if the rotator job already exists or queued
return nil

Check warning on line 210 in pkg/index/operator/service/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/operator.go#L208-L210

Added lines #L208 - L210 were not covered by tests
}
14 changes: 14 additions & 0 deletions pkg/index/operator/service/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,17 @@ func WithErrGroup(eg errgroup.Group) Option {
return nil
}
}

func WithReadReplicaEnabled(enabled bool) Option {
return func(o *operator) error {
o.readReplicaEnabled = enabled
return nil
}

Check warning on line 38 in pkg/index/operator/service/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/options.go#L34-L38

Added lines #L34 - L38 were not covered by tests
}

func WithReadReplicaLabelKey(key string) Option {
return func(o *operator) error {
o.readReplicaLabelKey = key
return nil
}

Check warning on line 45 in pkg/index/operator/service/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/service/options.go#L41-L45

Added lines #L41 - L45 were not covered by tests
}
6 changes: 5 additions & 1 deletion pkg/index/operator/usecase/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ type run struct {
// New returns Runner instance.
func New(cfg *config.Data) (_ runner.Runner, err error) {
eg := errgroup.Get()
operator, err := service.New(cfg.Operator.AgentName)
operator, err := service.New(
cfg.Operator.AgentName,
service.WithReadReplicaEnabled(cfg.Operator.ReadReplicaEnabled),
service.WithReadReplicaLabelKey(cfg.Operator.ReadReplicaLabelKey),
)

Check warning on line 48 in pkg/index/operator/usecase/operator.go

View check run for this annotation

Codecov / codecov/patch

pkg/index/operator/usecase/operator.go#L44-L48

Added lines #L44 - L48 were not covered by tests
if err != nil {
return nil, err
}
Expand Down

0 comments on commit f24440e

Please sign in to comment.