From 074b176934acfac591dc41d76cd3a8cdda229d97 Mon Sep 17 00:00:00 2001 From: "vi.trinhthe" Date: Fri, 7 Jul 2023 10:06:26 +0700 Subject: [PATCH] support redis db --- core/redis-datastore/redis.go | 3 ++- core/redis-statestore/redis.go | 3 ++- dashboard/service.go | 27 +++++++++++++++++++++++---- runtime/flow_runtime.go | 17 +++++++++-------- runtime/init_data_store.go | 4 ++-- runtime/init_state_store.go | 4 ++-- v1/goflow.go | 8 ++++++++ 7 files changed, 48 insertions(+), 18 deletions(-) diff --git a/core/redis-datastore/redis.go b/core/redis-datastore/redis.go index 5eedbc5..bcbdd47 100644 --- a/core/redis-datastore/redis.go +++ b/core/redis-datastore/redis.go @@ -13,11 +13,12 @@ type RedisDataStore struct { redisClient redis.UniversalClient } -func GetRedisDataStore(redisUri string, password string) (sdk.DataStore, error) { +func GetRedisDataStore(redisUri string, password string, db int) (sdk.DataStore, error) { ds := &RedisDataStore{} client := redis.NewClient(&redis.Options{ Addr: redisUri, Password: password, + DB: db, }) err := client.Ping().Err() if err != nil { diff --git a/core/redis-statestore/redis.go b/core/redis-statestore/redis.go index a8a6e60..3098a1f 100644 --- a/core/redis-statestore/redis.go +++ b/core/redis-statestore/redis.go @@ -19,12 +19,13 @@ type Incrementer interface { Incr(key string, value int64) (int64, error) } -func GetRedisStateStore(redisUri string, password string) (sdk.StateStore, error) { +func GetRedisStateStore(redisUri string, password string, db int) (sdk.StateStore, error) { stateStore := &RedisStateStore{} client := redis.NewClient(&redis.Options{ Addr: redisUri, Password: password, + DB: db, }) err := client.Ping().Err() diff --git a/dashboard/service.go b/dashboard/service.go index 0a41c03..c322d77 100644 --- a/dashboard/service.go +++ b/dashboard/service.go @@ -2,14 +2,15 @@ package main import ( "fmt" - "github.com/rs/xid" "log" + "os" + "strconv" + "strings" + "github.com/rs/xid" lib2 "github.com/s8sg/goflow/dashboard/lib" goflow3 "github.com/s8sg/goflow/v1" redis "gopkg.in/redis.v5" - "os" - "strings" ) var rdb *redis.Client @@ -115,6 +116,7 @@ func executeFlow(flow string, data []byte) (string, error) { fs := goflow3.FlowService{ RedisURL: getRedisAddr(), RedisPassword: getRedisPassword(), + RedisDB: getRedisDB(), } requestId := getNewId() @@ -136,6 +138,7 @@ func pauseRequest(flow string, requestID string) error { fs := &goflow3.FlowService{ RedisURL: getRedisAddr(), RedisPassword: getRedisPassword(), + RedisDB: getRedisDB(), } err := fs.Pause(flow, requestID) @@ -151,6 +154,7 @@ func resumeRequest(flow string, requestID string) error { fs := &goflow3.FlowService{ RedisURL: getRedisAddr(), RedisPassword: getRedisPassword(), + RedisDB: getRedisDB(), } err := fs.Resume(flow, requestID) @@ -166,6 +170,7 @@ func stopRequest(flow string, requestID string) error { fs := &goflow3.FlowService{ RedisURL: getRedisAddr(), RedisPassword: getRedisPassword(), + RedisDB: getRedisDB(), } err := fs.Stop(flow, requestID) @@ -179,11 +184,12 @@ func stopRequest(flow string, requestID string) error { func getRDB() *redis.Client { addr := getRedisAddr() password := getRedisPassword() + db := getRedisDB() if rdb == nil { rdb = redis.NewClient(&redis.Options{ Addr: addr, Password: password, - DB: 0, + DB: db, }) } return rdb @@ -202,6 +208,19 @@ func getRedisPassword() string { return addr } +func getRedisDB() int { + dbStr := os.Getenv("REDIS_DB") + if dbStr == "" { + return 0 + } + db, err := strconv.Atoi(dbStr) + if err != nil { + log.Printf("Failed get redisDB, %v", err) + return 0 + } + return db +} + func getNewId() string { guid := xid.New() return guid.String() diff --git a/runtime/flow_runtime.go b/runtime/flow_runtime.go index ed5134a..1f25b26 100644 --- a/runtime/flow_runtime.go +++ b/runtime/flow_runtime.go @@ -27,6 +27,7 @@ type FlowRuntime struct { OpenTracingUrl string RedisURL string RedisPassword string + RedisDB int stateStore sdk.StateStore DataStore sdk.DataStore Logger sdk.Logger @@ -84,16 +85,16 @@ func (fRuntime *FlowRuntime) Init() error { fRuntime.rdb = redis.NewClient(&redis.Options{ Addr: fRuntime.RedisURL, Password: fRuntime.RedisPassword, - DB: 0, + DB: fRuntime.RedisDB, }) - fRuntime.stateStore, err = initStateStore(fRuntime.RedisURL, fRuntime.RedisPassword) + fRuntime.stateStore, err = initStateStore(fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB) if err != nil { return fmt.Errorf("failed to initialize the StateStore, %v", err) } if fRuntime.DataStore == nil { - fRuntime.DataStore, err = initDataStore(fRuntime.RedisURL, fRuntime.RedisPassword) + fRuntime.DataStore, err = initDataStore(fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB) if err != nil { return fmt.Errorf("failed to initialize the StateStore, %v", err) } @@ -139,7 +140,7 @@ func OpenConnectionV2(tag string, network string, address string, password strin func (fRuntime *FlowRuntime) Execute(flowName string, request *runtime.Request) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil) if err != nil { return fmt.Errorf("failed to initiate connection, error %v", err) } @@ -165,7 +166,7 @@ func (fRuntime *FlowRuntime) Execute(flowName string, request *runtime.Request) } func (fRuntime *FlowRuntime) Pause(flowName string, request *runtime.Request) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil) if err != nil { return fmt.Errorf("failed to initiate connection, error %v", err) } @@ -190,7 +191,7 @@ func (fRuntime *FlowRuntime) Pause(flowName string, request *runtime.Request) er } func (fRuntime *FlowRuntime) Stop(flowName string, request *runtime.Request) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil) if err != nil { return fmt.Errorf("failed to initiate connection, error %v", err) } @@ -215,7 +216,7 @@ func (fRuntime *FlowRuntime) Stop(flowName string, request *runtime.Request) err } func (fRuntime *FlowRuntime) Resume(flowName string, request *runtime.Request) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil) if err != nil { return fmt.Errorf("failed to initiate connection, error %v", err) } @@ -262,7 +263,7 @@ func (fRuntime *FlowRuntime) StopServer() error { // StartQueueWorker starts listening for request in queue func (fRuntime *FlowRuntime) StartQueueWorker(errorChan chan error) error { - connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil) if err != nil { return fmt.Errorf("failed to initiate connection, error %v", err) } diff --git a/runtime/init_data_store.go b/runtime/init_data_store.go index cf6557d..395877b 100644 --- a/runtime/init_data_store.go +++ b/runtime/init_data_store.go @@ -5,7 +5,7 @@ import ( "github.com/s8sg/goflow/core/sdk" ) -func initDataStore(redisURI string, password string) (dataStore sdk.DataStore, err error) { - dataStore, err = redisDataStore.GetRedisDataStore(redisURI, password) +func initDataStore(redisURI string, password string, db int) (dataStore sdk.DataStore, err error) { + dataStore, err = redisDataStore.GetRedisDataStore(redisURI, password, db) return dataStore, err } diff --git a/runtime/init_state_store.go b/runtime/init_state_store.go index e55ee33..c7173e0 100644 --- a/runtime/init_state_store.go +++ b/runtime/init_state_store.go @@ -5,7 +5,7 @@ import ( "github.com/s8sg/goflow/core/sdk" ) -func initStateStore(redisURI string, password string) (stateStore sdk.StateStore, err error) { - stateStore, err = redisStateStore.GetRedisStateStore(redisURI, password) +func initStateStore(redisURI string, password string, db int) (stateStore sdk.StateStore, err error) { + stateStore, err = redisStateStore.GetRedisStateStore(redisURI, password, db) return stateStore, err } diff --git a/v1/goflow.go b/v1/goflow.go index 03b4dea..002b226 100644 --- a/v1/goflow.go +++ b/v1/goflow.go @@ -13,6 +13,7 @@ type FlowService struct { Port int RedisURL string RedisPassword string + RedisDB int RequestAuthSharedSecret string RequestAuthEnabled bool WorkerConcurrency int @@ -54,6 +55,7 @@ func (fs *FlowService) Execute(flowName string, req *Request) error { fs.runtime = &runtime.FlowRuntime{ RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, RequestAuthEnabled: fs.RequestAuthEnabled, RequestAuthSharedSecret: fs.RequestAuthSharedSecret, } @@ -86,6 +88,7 @@ func (fs *FlowService) Pause(flowName string, requestId string) error { fs.runtime = &runtime.FlowRuntime{ RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, RequestAuthEnabled: fs.RequestAuthEnabled, RequestAuthSharedSecret: fs.RequestAuthSharedSecret, } @@ -115,6 +118,7 @@ func (fs *FlowService) Resume(flowName string, requestId string) error { fs.runtime = &runtime.FlowRuntime{ RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, RequestAuthEnabled: fs.RequestAuthEnabled, RequestAuthSharedSecret: fs.RequestAuthSharedSecret, } @@ -144,6 +148,7 @@ func (fs *FlowService) Stop(flowName string, requestId string) error { fs.runtime = &runtime.FlowRuntime{ RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, RequestAuthEnabled: fs.RequestAuthEnabled, RequestAuthSharedSecret: fs.RequestAuthSharedSecret, } @@ -191,6 +196,7 @@ func (fs *FlowService) Start() error { OpenTracingUrl: fs.OpenTraceUrl, RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, DataStore: fs.DataStore, Logger: fs.Logger, ServerPort: fs.Port, @@ -222,6 +228,7 @@ func (fs *FlowService) StartServer() error { OpenTracingUrl: fs.OpenTraceUrl, RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, DataStore: fs.DataStore, Logger: fs.Logger, ServerPort: fs.Port, @@ -251,6 +258,7 @@ func (fs *FlowService) StartWorker() error { OpenTracingUrl: fs.OpenTraceUrl, RedisURL: fs.RedisURL, RedisPassword: fs.RedisPassword, + RedisDB: fs.RedisDB, DataStore: fs.DataStore, Logger: fs.Logger, Concurrency: fs.WorkerConcurrency,