Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support redis db #78

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/redis-datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ type RedisDataStore struct {
redisClient redis.UniversalClient
}

func GetRedisDataStore(redisUri string, password string) (sdk.DataStore, error) {
func GetRedisDataStore(redisUri string, password string, db int) (sdk.DataStore, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get the password and string from enviroment?

ds := &RedisDataStore{}
client := redis.NewClient(&redis.Options{
Addr: redisUri,
Password: password,
DB: db,
})
err := client.Ping().Err()
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion core/redis-statestore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ type Incrementer interface {
Incr(key string, value int64) (int64, error)
}

func GetRedisStateStore(redisUri string, password string) (sdk.StateStore, error) {
func GetRedisStateStore(redisUri string, password string, db int) (sdk.StateStore, error) {
stateStore := &RedisStateStore{}

client := redis.NewClient(&redis.Options{
Addr: redisUri,
Password: password,
DB: db,
})

err := client.Ping().Err()
Expand Down
27 changes: 23 additions & 4 deletions dashboard/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package main

import (
"fmt"
"github.com/rs/xid"
"log"
"os"
"strconv"
"strings"

"github.com/rs/xid"
lib2 "github.com/s8sg/goflow/dashboard/lib"
goflow3 "github.com/s8sg/goflow/v1"
redis "gopkg.in/redis.v5"
"os"
"strings"
)

var rdb *redis.Client
Expand Down Expand Up @@ -115,6 +116,7 @@ func executeFlow(flow string, data []byte) (string, error) {
fs := goflow3.FlowService{
RedisURL: getRedisAddr(),
RedisPassword: getRedisPassword(),
RedisDB: getRedisDB(),
}

requestId := getNewId()
Expand All @@ -136,6 +138,7 @@ func pauseRequest(flow string, requestID string) error {
fs := &goflow3.FlowService{
RedisURL: getRedisAddr(),
RedisPassword: getRedisPassword(),
RedisDB: getRedisDB(),
}

err := fs.Pause(flow, requestID)
Expand All @@ -151,6 +154,7 @@ func resumeRequest(flow string, requestID string) error {
fs := &goflow3.FlowService{
RedisURL: getRedisAddr(),
RedisPassword: getRedisPassword(),
RedisDB: getRedisDB(),
}

err := fs.Resume(flow, requestID)
Expand All @@ -166,6 +170,7 @@ func stopRequest(flow string, requestID string) error {
fs := &goflow3.FlowService{
RedisURL: getRedisAddr(),
RedisPassword: getRedisPassword(),
RedisDB: getRedisDB(),
}

err := fs.Stop(flow, requestID)
Expand All @@ -179,11 +184,12 @@ func stopRequest(flow string, requestID string) error {
func getRDB() *redis.Client {
addr := getRedisAddr()
password := getRedisPassword()
db := getRedisDB()
if rdb == nil {
rdb = redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: 0,
DB: db,
})
}
return rdb
Expand All @@ -202,6 +208,19 @@ func getRedisPassword() string {
return addr
}

func getRedisDB() int {
dbStr := os.Getenv("REDIS_DB")
if dbStr == "" {
return 0
}
db, err := strconv.Atoi(dbStr)
if err != nil {
log.Printf("Failed get redisDB, %v", err)
return 0
}
return db
}

func getNewId() string {
guid := xid.New()
return guid.String()
Expand Down
15 changes: 8 additions & 7 deletions runtime/flow_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type FlowRuntime struct {
OpenTracingUrl string
RedisURL string
RedisPassword string
RedisDB int
stateStore sdk.StateStore
DataStore sdk.DataStore
Logger sdk.Logger
Expand Down Expand Up @@ -87,16 +88,16 @@ func (fRuntime *FlowRuntime) Init() error {
fRuntime.rdb = redis.NewClient(&redis.Options{
Addr: fRuntime.RedisURL,
Password: fRuntime.RedisPassword,
DB: 0,
DB: fRuntime.RedisDB,
})

fRuntime.stateStore, err = initStateStore(fRuntime.RedisURL, fRuntime.RedisPassword)
fRuntime.stateStore, err = initStateStore(fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB)
if err != nil {
return fmt.Errorf("failed to initialize the StateStore, %v", err)
}

if fRuntime.DataStore == nil {
fRuntime.DataStore, err = initDataStore(fRuntime.RedisURL, fRuntime.RedisPassword)
fRuntime.DataStore, err = initDataStore(fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB)
if err != nil {
return fmt.Errorf("failed to initialize the StateStore, %v", err)
}
Expand Down Expand Up @@ -225,7 +226,7 @@ func OpenConnectionV2(tag string, network string, address string, password strin

func (fRuntime *FlowRuntime) Execute(flowName string, request *runtime.Request) error {

connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil)
connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil)
if err != nil {
return fmt.Errorf("failed to initiate connection, error %v", err)
}
Expand All @@ -251,7 +252,7 @@ func (fRuntime *FlowRuntime) Execute(flowName string, request *runtime.Request)
}

func (fRuntime *FlowRuntime) Pause(flowName string, request *runtime.Request) error {
connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil)
connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil)
if err != nil {
return fmt.Errorf("failed to initiate connection, error %v", err)
}
Expand All @@ -276,7 +277,7 @@ func (fRuntime *FlowRuntime) Pause(flowName string, request *runtime.Request) er
}

func (fRuntime *FlowRuntime) Stop(flowName string, request *runtime.Request) error {
connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil)
connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil)
if err != nil {
return fmt.Errorf("failed to initiate connection, error %v", err)
}
Expand All @@ -301,7 +302,7 @@ func (fRuntime *FlowRuntime) Stop(flowName string, request *runtime.Request) err
}

func (fRuntime *FlowRuntime) Resume(flowName string, request *runtime.Request) error {
connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil)
connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, fRuntime.RedisDB, nil)
if err != nil {
return fmt.Errorf("failed to initiate connection, error %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions runtime/init_data_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"github.com/s8sg/goflow/core/sdk"
)

func initDataStore(redisURI string, password string) (dataStore sdk.DataStore, err error) {
dataStore, err = redisDataStore.GetRedisDataStore(redisURI, password)
func initDataStore(redisURI string, password string, db int) (dataStore sdk.DataStore, err error) {
dataStore, err = redisDataStore.GetRedisDataStore(redisURI, password, db)
return dataStore, err
}
4 changes: 2 additions & 2 deletions runtime/init_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"github.com/s8sg/goflow/core/sdk"
)

func initStateStore(redisURI string, password string) (stateStore sdk.StateStore, err error) {
stateStore, err = redisStateStore.GetRedisStateStore(redisURI, password)
func initStateStore(redisURI string, password string, db int) (stateStore sdk.StateStore, err error) {
stateStore, err = redisStateStore.GetRedisStateStore(redisURI, password, db)
return stateStore, err
}
8 changes: 5 additions & 3 deletions v1/goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type FlowService struct {
Port int
RedisURL string
RedisPassword string
RedisDB int
RequestAuthSharedSecret string
RequestAuthEnabled bool
WorkerConcurrency int
Expand Down Expand Up @@ -54,6 +55,7 @@ func (fs *FlowService) Execute(flowName string, req *Request) error {
fs.runtime = &runtime.FlowRuntime{
RedisURL: fs.RedisURL,
RedisPassword: fs.RedisPassword,
RedisDB: fs.RedisDB,
RequestAuthEnabled: fs.RequestAuthEnabled,
RequestAuthSharedSecret: fs.RequestAuthSharedSecret,
}
Expand Down Expand Up @@ -86,6 +88,7 @@ func (fs *FlowService) Pause(flowName string, requestId string) error {
fs.runtime = &runtime.FlowRuntime{
RedisURL: fs.RedisURL,
RedisPassword: fs.RedisPassword,
RedisDB: fs.RedisDB,
RequestAuthEnabled: fs.RequestAuthEnabled,
RequestAuthSharedSecret: fs.RequestAuthSharedSecret,
}
Expand Down Expand Up @@ -115,6 +118,7 @@ func (fs *FlowService) Resume(flowName string, requestId string) error {
fs.runtime = &runtime.FlowRuntime{
RedisURL: fs.RedisURL,
RedisPassword: fs.RedisPassword,
RedisDB: fs.RedisDB,
RequestAuthEnabled: fs.RequestAuthEnabled,
RequestAuthSharedSecret: fs.RequestAuthSharedSecret,
}
Expand Down Expand Up @@ -144,6 +148,7 @@ func (fs *FlowService) Stop(flowName string, requestId string) error {
fs.runtime = &runtime.FlowRuntime{
RedisURL: fs.RedisURL,
RedisPassword: fs.RedisPassword,
RedisDB: fs.RedisDB,
RequestAuthEnabled: fs.RequestAuthEnabled,
RequestAuthSharedSecret: fs.RequestAuthSharedSecret,
}
Expand Down Expand Up @@ -198,7 +203,6 @@ func (fs *FlowService) Register(flowName string, handler runtime.FlowDefinitionH

func (fs *FlowService) Start() error {
fs.ConfigureDefault()

errorChan := make(chan error)
defer close(errorChan)

Expand All @@ -216,7 +220,6 @@ func (fs *FlowService) Start() error {

func (fs *FlowService) StartServer() error {
fs.ConfigureDefault()

errorChan := make(chan error)
defer close(errorChan)
if err := fs.initRuntime(errorChan); err != nil {
Expand All @@ -234,7 +237,6 @@ func (fs *FlowService) StartServer() error {

func (fs *FlowService) StartWorker() error {
fs.ConfigureDefault()

errorChan := make(chan error)
defer close(errorChan)

Expand Down