From 27d750cd64142997ffde4092ac01a80c5cda9b48 Mon Sep 17 00:00:00 2001 From: HighPon Date: Mon, 18 Nov 2024 04:17:07 +0000 Subject: [PATCH] [WIP] feat: Implenet vald index export --- cmd/index/job/exportation/main.go | 37 ++- cmd/index/job/exportation/sample.yaml | 105 +++++++++ internal/config/index_exporter.go | 30 +++ pkg/index/job/exportation/config/config.go | 66 ++++++ pkg/index/job/exportation/service/exporter.go | 217 ++++++++++++++++++ pkg/index/job/exportation/service/options.go | 55 +++++ .../job/exportation/usecase/exportation.go | 174 ++++++++++++++ 7 files changed, 682 insertions(+), 2 deletions(-) create mode 100644 cmd/index/job/exportation/sample.yaml create mode 100644 internal/config/index_exporter.go create mode 100644 pkg/index/job/exportation/config/config.go create mode 100644 pkg/index/job/exportation/service/exporter.go create mode 100644 pkg/index/job/exportation/service/options.go create mode 100644 pkg/index/job/exportation/usecase/exportation.go diff --git a/cmd/index/job/exportation/main.go b/cmd/index/job/exportation/main.go index c5553f225e0..8c16dd224b3 100644 --- a/cmd/index/job/exportation/main.go +++ b/cmd/index/job/exportation/main.go @@ -13,7 +13,17 @@ // limitations under the License. package main -import "fmt" +import ( + "context" + "log" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/info" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/pkg/index/job/exportation/config" + "github.com/vdaas/vald/pkg/index/job/exportation/usecase" +) const ( maxVersion = "v0.0.10" @@ -22,5 +32,28 @@ const ( ) func main() { - fmt.Println("hello world") + if err := safety.RecoverFunc(func() error { + return runner.Do( + context.Background(), + runner.WithName(name), + runner.WithVersion(info.Version, maxVersion, minVersion), + runner.WithConfigLoader(func(path string) (any, *config.GlobalConfig, error) { + cfg, err := config.NewConfig(path) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration") + } + return cfg, &cfg.GlobalConfig, nil + }), + runner.WithDaemonInitializer(func(cfg any) (runner.Runner, error) { + c, ok := cfg.(*config.Data) + if !ok { + return nil, errors.ErrInvalidConfig + } + return usecase.New(c) + }), + ) + })(); err != nil { + log.Fatal(err, info.Get()) + return + } } diff --git a/cmd/index/job/exportation/sample.yaml b/cmd/index/job/exportation/sample.yaml new file mode 100644 index 00000000000..ee932b61b4d --- /dev/null +++ b/cmd/index/job/exportation/sample.yaml @@ -0,0 +1,105 @@ +# +# Copyright (C) 2019-2024 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +version: v0.0.0 +time_zone: JST +logging: + format: raw + level: info + logger: glg +server_config: + servers: + - name: grpc + host: 0.0.0.0 + port: 8081 + grpc: + bidirectional_stream_concurrency: 20 + connection_timeout: "" + header_table_size: 0 + initial_conn_window_size: 0 + initial_window_size: 0 + interceptors: [] + keepalive: + max_conn_age: "" + max_conn_age_grace: "" + max_conn_idle: "" + time: "" + timeout: "" + max_header_list_size: 0 + max_receive_message_size: 0 + max_send_message_size: 0 + read_buffer_size: 0 + write_buffer_size: 0 + mode: GRPC + probe_wait_time: 3s + restart: true + health_check_servers: + - name: readiness + host: 0.0.0.0 + port: 3001 + http: + handler_timeout: "" + idle_timeout: "" + read_header_timeout: "" + read_timeout: "" + shutdown_duration: 0s + write_timeout: "" + mode: "" + probe_wait_time: 3s + metrics_servers: + startup_strategy: + - grpc + - readiness + full_shutdown_duration: 600s + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key +exporter: + concurrency: 1 + kvs_background_sync_interval: 5s + kvs_background_compaction_interval: 5s +observability: + enabled: false + otlp: + collector_endpoint: "otel-collector.monitoring.svc.cluster.local:4317" + trace_batch_timeout: "1s" + trace_export_timeout: "1m" + trace_max_export_batch_size: 1024 + trace_max_queue_size: 256 + metrics_export_interval: "1s" + metrics_export_timeout: "1m" + attribute: + namespace: "_MY_POD_NAMESPACE_" + pod_name: "_MY_POD_NAME_" + node_name: "_MY_NODE_NAME_" + service_name: "vald-index-deletion" + metrics: + enable_cgo: true + enable_goroutine: true + enable_memory: true + enable_version_info: true + version_info_labels: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - algorithm_info + trace: + enabled: true diff --git a/internal/config/index_exporter.go b/internal/config/index_exporter.go new file mode 100644 index 00000000000..173184f421a --- /dev/null +++ b/internal/config/index_exporter.go @@ -0,0 +1,30 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +// IndexExporter represents the configurations for index exportation. +type IndexExporter struct { + // Concurrency represents indexing concurrency. + Concurrency int `json:"concurrency" yaml:"concurrency"` + + // KVSBackgroundSyncInterval represents interval for checked id list kvs sync duration + KVSBackgroundSyncInterval string `json:"kvs_background_sync_interval" yaml:"kvs_background_sync_interval"` + + // KVSBackgroundCompactionInterval represents interval for checked id list kvs compaction duration + KVSBackgroundCompactionInterval string `json:"kvs_background_compaction_interval" yaml:"kvs_background_compaction_interval"` +} + +func (ic *IndexExporter) Bind() *IndexExporter { + return ic +} diff --git a/pkg/index/job/exportation/config/config.go b/pkg/index/job/exportation/config/config.go new file mode 100644 index 00000000000..5e7d38ad912 --- /dev/null +++ b/pkg/index/job/exportation/config/config.go @@ -0,0 +1,66 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +import ( + "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/errors" +) + +// GlobalConfig is a type alias of config.GlobalConfig representing application base configurations. +type GlobalConfig = config.GlobalConfig + +// Data represents the application configurations. +type Data struct { + // GlobalConfig represents application base configurations. + config.GlobalConfig `json:",inline" yaml:",inline"` + + // Server represent all server configurations + Server *config.Servers `json:"server_config" yaml:"server_config"` + + // Observability represents observability configurations. + Observability *config.Observability `json:"observability" yaml:"observability"` + + // Deletion represents auto indexing service configurations. + Exporter *config.IndexExporter `json:"exporter" yaml:"exporter"` +} + +// NewConfig load configurations from file path. +func NewConfig(path string) (cfg *Data, err error) { + cfg = new(Data) + + if err = config.Read(path, &cfg); err != nil { + return nil, err + } + + if cfg != nil { + _ = cfg.GlobalConfig.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + if cfg.Server != nil { + _ = cfg.Server.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + if cfg.Observability != nil { + _ = cfg.Observability.Bind() + } else { + cfg.Observability = new(config.Observability).Bind() + } + + return cfg, nil +} diff --git a/pkg/index/job/exportation/service/exporter.go b/pkg/index/job/exportation/service/exporter.go new file mode 100644 index 00000000000..f3f35cbeae1 --- /dev/null +++ b/pkg/index/job/exportation/service/exporter.go @@ -0,0 +1,217 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "context" + "io" + "os" + "reflect" + "time" + + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/apis/grpc/v1/vald" + vc "github.com/vdaas/vald/internal/client/v1/client/vald" + "github.com/vdaas/vald/internal/db/kvs/pogreb" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/file" + "github.com/vdaas/vald/internal/log" + igrpc "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/observability/trace" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/internal/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + apiName = "vald/index/job/export" + grpcMethodName = "vald.v1.StreamListObject/" + vald.StreamListObjectRPCName +) + +// Exporter represents an interface for exporting. +type Exporter interface { + StartClient(ctx context.Context) (<-chan error, error) + Start(ctx context.Context) error +} + +var defaultOpts = []Option{ + WithStreamListConcurrency(1), +} + +type export struct { + eg errgroup.Group + gateway vc.Client + storedVector pogreb.DB + + streamListConcurrency int + backgroundSyncInterval time.Duration + backgroundCompactionInterval time.Duration +} + +// New returns Exporter object if no error occurs. +func New(opts ...Option) (Exporter, error) { + e := new(export) + for _, opt := range append(defaultOpts, opts...) { + if err := opt(e); err != nil { + oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + e := &errors.ErrCriticalOption{} + if errors.As(oerr, &e) { + log.Error(err) + return nil, oerr + } + log.Warn(oerr) + } + } + + // Todo: Determine file path + dir := file.Join("TBD/", "checked") + // Todo: Check file permission + if err := file.MkdirAll(dir, os.ModePerm); err != nil { + log.Errorf("failed to create dir %s", dir) + return nil, err + } + // Todo: Determine file name + path := file.Join(dir, "test.db") + db, err := pogreb.New(pogreb.WithPath(path), + pogreb.WithBackgroundCompactionInterval(e.backgroundCompactionInterval), + pogreb.WithBackgroundSyncInterval(e.backgroundSyncInterval)) + if err != nil { + log.Errorf("failed to open checked List kvs DB %s", path) + return nil, err + } + e.storedVector = db + return e, nil +} + +// StartClient starts the gRPC client. +func (e *export) StartClient(ctx context.Context) (<-chan error, error) { + ech := make(chan error, 2) + gch, err := e.gateway.Start(ctx) + if err != nil { + return nil, err + } + e.eg.Go(safety.RecoverFunc(func() (err error) { + defer close(ech) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-gch: + } + if err != nil { + select { + case <-ctx.Done(): + return ctx.Err() + case ech <- err: + } + } + } + })) + return ech, nil +} + +func (e *export) Start(ctx context.Context) error { + err := e.doExportIndex(ctx, + func(ctx context.Context, rc vald.ObjectClient, copts ...grpc.CallOption) (vald.Object_StreamListObjectClient, error) { + return rc.StreamListObject(ctx, &payload.Object_List_Request{}, copts...) + }, + ) + return err +} + +func (e *export) doExportIndex( + ctx context.Context, + fn func(ctx context.Context, rc vald.ObjectClient, copts ...grpc.CallOption) (vald.Object_StreamListObjectClient, error), +) (errs error) { + ctx, span := trace.StartSpan(igrpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doExportIndex") + defer func() { + if span != nil { + span.End() + } + }() + + emptyReq := new(payload.Object_List_Request) + + eg, egctx := errgroup.WithContext(ctx) + eg.SetLimit(e.streamListConcurrency) + ctx, cancel := context.WithCancelCause(egctx) + gatewayAddrs := e.gateway.GRPCClient().ConnectedAddrs() + if len(gatewayAddrs) < 0 { + log.Errorf("Active gateway is not found.: %v ", ctx.Err()) + } + + conn, err := grpc.NewClient(gatewayAddrs[0], grpc.WithTransportCredentials(insecure.NewCredentials())) + stream, err := vc.NewValdClient(conn).StreamListObject(ctx, emptyReq, nil) + if err != nil || stream == nil { + return err + } + log.Infof("starting exporter to %s", gatewayAddrs[0]) + + for { + select { + case <-ctx.Done(): + if !errors.Is(ctx.Err(), context.Canceled) { + log.Errorf("context done unexpectedly: %v", ctx.Err()) + } + if !errors.Is(context.Cause(ctx), io.EOF) { + log.Errorf("context canceled due to %v", ctx.Err()) + } + err = eg.Wait() + if err != nil { + log.Errorf("exporter returned error status errgroup returned error: %v", ctx.Err()) + } else { + log.Infof("exporter finished") + } + return nil + default: + res, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + cancel(io.EOF) + } else { + cancel(errors.ErrStreamListObjectStreamFinishedUnexpectedly(err)) + } + } else if res != nil && res.GetVector() != nil && res.GetVector().GetId() != "" { + eg.Go(safety.RecoverFunc(func() (err error) { + objVec := res.GetVector() + ts := objVec.GetTimestamp() + id := objVec.GetId() + + storedBinary, ok, err := e.storedVector.Get(id) + if err != nil { + log.Errorf("failed to perform Get from check list but still try to finish processing without cache: %v", err) + } + + var storedObjVec *payload.Object_Vector + if ok { + if err := storedObjVec.UnmarshalVT(storedBinary); err != nil { + log.Errorf("failed to Unmarshal proto to payload.Object_Vector: %v", err) + } + } + + isUpsertVector := !ok || storedObjVec.GetTimestamp() < ts + if isUpsertVector { + protoData, err := objVec.MarshalVT() + if err != nil { + panic(err) + } + e.storedVector.Set(id, protoData) + } + return nil + })) + } + } + } +} diff --git a/pkg/index/job/exportation/service/options.go b/pkg/index/job/exportation/service/options.go new file mode 100644 index 00000000000..85ea7481f71 --- /dev/null +++ b/pkg/index/job/exportation/service/options.go @@ -0,0 +1,55 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "time" + + "github.com/vdaas/vald/internal/errors" +) + +type Option func(_ *export) error + +// WithStreamListConcurrency returns Option that sets streamListConcurrency. +func WithStreamListConcurrency(num int) Option { + return func(e *export) error { + if num <= 0 { + return errors.NewErrInvalidOption("streamListConcurrency", num) + } + e.streamListConcurrency = num + return nil + } +} + +// WithBackgroundSyncInterval returns Option that sets backgroundSyncInterval. +func WithBackgroundSyncInterval(t time.Duration) Option { + return func(e *export) error { + if t <= 0 { + return errors.NewErrInvalidOption("backgroundSyncInterval", t) + } + e.backgroundSyncInterval = t + return nil + } +} + +// WithBackgroundCompactionInterval returns Option that sets backgroundCompactionInterval. +func WithBackgroundCompactionInterval(t time.Duration) Option { + return func(e *export) error { + if t <= 0 { + return errors.NewErrInvalidOption("backgroundCompactionInterval", t) + } + e.backgroundCompactionInterval = t + return nil + } +} diff --git a/pkg/index/job/exportation/usecase/exportation.go b/pkg/index/job/exportation/usecase/exportation.go new file mode 100644 index 00000000000..3077f72b2a7 --- /dev/null +++ b/pkg/index/job/exportation/usecase/exportation.go @@ -0,0 +1,174 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package usecase + +import ( + "context" + "os" + "syscall" + + iconfig "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc/interceptor/server/recover" + "github.com/vdaas/vald/internal/observability" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/internal/servers/server" + "github.com/vdaas/vald/internal/servers/starter" + "github.com/vdaas/vald/internal/sync/errgroup" + "github.com/vdaas/vald/pkg/index/job/exportation/config" + "github.com/vdaas/vald/pkg/index/job/exportation/service" +) + +type run struct { + eg errgroup.Group + cfg *config.Data + observability observability.Observability + server starter.Server + exporter service.Exporter +} + +// New returns Runner instance. +func New(cfg *config.Data) (_ runner.Runner, err error) { + eg := errgroup.Get() + + exporter, err := service.New( + service.WithStreamListConcurrency(cfg.Exporter.Concurrency), + ) + if err != nil { + return nil, err + } + + srv, err := starter.New( + starter.WithConfig(cfg.Server), + starter.WithGRPC(func(cfg *iconfig.Server) []server.Option { + return []server.Option{ + server.WithGRPCOption( + grpc.ChainUnaryInterceptor(recover.RecoverInterceptor()), + grpc.ChainStreamInterceptor(recover.RecoverStreamInterceptor()), + ), + } + }), + ) + if err != nil { + return nil, err + } + + var obs observability.Observability + if cfg.Observability.Enabled { + obs, err = observability.NewWithConfig( + cfg.Observability, + ) + if err != nil { + return nil, err + } + } + + return &run{ + eg: eg, + cfg: cfg, + observability: obs, + server: srv, + exporter: exporter, + }, nil +} + +// PreStart is a method called before execution of Start, and it invokes the PreStart method of observability. +func (r *run) PreStart(ctx context.Context) error { + if r.observability != nil { + return r.observability.PreStart(ctx) + } + return nil +} + +// Start is a method used to initiate an operation in the run, and it returns a channel for receiving errors +// during the operation and an error representing any initialization errors. +func (r *run) Start(ctx context.Context) (<-chan error, error) { + ech := make(chan error, 4) + var sech, oech <-chan error + if r.observability != nil { + oech = r.observability.Start(ctx) + } + sech = r.server.ListenAndServe(ctx) + cech, err := r.exporter.StartClient(ctx) + if err != nil { + close(ech) + return nil, err + } + + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer func() { + p, err := os.FindProcess(os.Getpid()) + if err != nil { + // using Fatal to avoid this process to be zombie + // skipcq: RVV-A0003 + log.Fatalf("failed to find my pid to kill %v", err) + return + } + log.Info("sending SIGTERM to myself to stop this job") + if err := p.Signal(syscall.SIGTERM); err != nil { + log.Error(err) + } + }() + return r.exporter.Start(ctx) + })) + + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer close(ech) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-oech: + case err = <-sech: + case err = <-cech: + } + if err != nil { + select { + case <-ctx.Done(): + return errors.Join(ctx.Err(), err) + case ech <- err: + } + } + } + })) + return ech, nil +} + +// PreStop is a method called before execution of Stop. +func (*run) PreStop(_ context.Context) error { + return nil +} + +// Stop is a method used to stop an operation in the run. +func (r *run) Stop(ctx context.Context) (errs error) { + if r.observability != nil { + if err := r.observability.Stop(ctx); err != nil { + errs = errors.Join(errs, err) + } + } + if r.server != nil { + if err := r.server.Shutdown(ctx); err != nil { + errs = errors.Join(errs, err) + } + } + return errs +} + +// PostStop is a method called after execution of Stop. +func (*run) PostStop(_ context.Context) error { + return nil +}