Skip to content

Commit

Permalink
fix: reduce clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
Reasno committed Sep 10, 2021
1 parent ff42ae3 commit a834c17
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 46 deletions.
61 changes: 34 additions & 27 deletions dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ type ConsumableDispatcher interface {
Consume(ctx context.Context) error
}

// Configuration is the struct for queue configs.
type Configuration struct {
// configuration is the struct for queue configs.
type configuration struct {
RedisName string `yaml:"redisName" json:"redisName"`
Parallelism int `yaml:"parallelism" json:"parallelism"`
CheckQueueLengthIntervalSecond int `yaml:"checkQueueLengthIntervalSecond" json:"checkQueueLengthIntervalSecond"`
Expand All @@ -73,11 +73,8 @@ type makerIn struct {
JobDispatcher JobDispatcher `optional:"true"`
EventDispatcher contract.Dispatcher `optional:"true"`
Logger log.Logger
AppName contract.AppName
Env contract.Env
Gauge Gauge `optional:"true"`
Populator contract.DIPopulator `optional:"true"`
Driver Driver `optional:"true"`
}

// makerOut is the di output JobFrom provideDispatcherFactory
Expand All @@ -99,7 +96,7 @@ func provideDispatcherFactory(option *providersOption) func(p makerIn) (makerOut
return func(p makerIn) (makerOut, error) {
var (
err error
queueConfs map[string]Configuration
queueConfs map[string]configuration
)
err = p.Conf.Unmarshal("queue", &queueConfs)
if err != nil {
Expand All @@ -108,14 +105,14 @@ func provideDispatcherFactory(option *providersOption) func(p makerIn) (makerOut
factory := di.NewFactory(func(name string) (di.Pair, error) {
var (
ok bool
conf Configuration
conf configuration
)
p := p
if conf, ok = queueConfs[name]; !ok {
if name != "default" {
return di.Pair{}, fmt.Errorf("queue Configuration %s not found", name)
return di.Pair{}, fmt.Errorf("queue configuration %s not found", name)
}
conf = Configuration{
conf = configuration{
Parallelism: runtime.NumCPU(),
CheckQueueLengthIntervalSecond: 0,
}
Expand All @@ -135,12 +132,8 @@ func provideDispatcherFactory(option *providersOption) func(p makerIn) (makerOut
var driver = option.driver
if option.driver == nil {
driver, err = option.driverConstructor(
DriverConstructorArgs{
DriverArgs{
Name: name,
Conf: conf,
Logger: p.Logger,
AppName: p.AppName,
Env: p.Env,
Populator: p.Populator,
},
)
Expand Down Expand Up @@ -191,27 +184,41 @@ func (d makerOut) ProvideRunGroup(group *run.Group) {
}
}

func newDefaultDriver(args DriverConstructorArgs) (Driver, error) {
var maker otredis.Maker
func newDefaultDriver(args DriverArgs) (Driver, error) {
var injected struct {
di.In

contract.AppName
contract.Env
contract.Logger
otredis.Maker
contract.ConfigUnmarshaler
}

if args.Populator == nil {
return nil, errors.New("the default driver requires setting the populator in DI container")
}
if err := args.Populator.Populate(&maker); err != nil {
return nil, fmt.Errorf("the default driver requires an otredis.Maker in DI container: %w", err)
if err := args.Populator.Populate(&injected); err != nil {
return nil, fmt.Errorf("missing dependency for the default queue driver: %w", err)
}
var redisName string
if err := injected.ConfigUnmarshaler.Unmarshal(fmt.Sprintf("queue.%s.redisName", injected.AppName), &redisName); err != nil {
return nil, fmt.Errorf("bad configuration: %w", err)
}
client, err := maker.Make(args.Conf.RedisName)

client, err := injected.Maker.Make(redisName)
if err != nil {
return nil, fmt.Errorf("the default driver requires the redis client called %s: %w", args.Conf.RedisName, err)
return nil, fmt.Errorf("the default driver requires the redis client called %s: %w", redisName, err)
}
return &RedisDriver{
Logger: args.Logger,
Logger: injected.Logger,
RedisClient: client,
ChannelConfig: ChannelConfig{
Delayed: fmt.Sprintf("{%s:%s:%s}:delayed", args.AppName.String(), args.Env.String(), args.Name),
Failed: fmt.Sprintf("{%s:%s:%s}:failed", args.AppName.String(), args.Env.String(), args.Name),
Reserved: fmt.Sprintf("{%s:%s:%s}:reserved", args.AppName.String(), args.Env.String(), args.Name),
Waiting: fmt.Sprintf("{%s:%s:%s}:waiting", args.AppName.String(), args.Env.String(), args.Name),
Timeout: fmt.Sprintf("{%s:%s:%s}:timeout", args.AppName.String(), args.Env.String(), args.Name),
Delayed: fmt.Sprintf("{%s:%s:%s}:delayed", injected.AppName.String(), injected.Env.String(), args.Name),
Failed: fmt.Sprintf("{%s:%s:%s}:failed", injected.AppName.String(), injected.Env.String(), args.Name),
Reserved: fmt.Sprintf("{%s:%s:%s}:reserved", injected.AppName.String(), injected.Env.String(), args.Name),
Waiting: fmt.Sprintf("{%s:%s:%s}:waiting", injected.AppName.String(), injected.Env.String(), args.Name),
Timeout: fmt.Sprintf("{%s:%s:%s}:timeout", injected.AppName.String(), injected.Env.String(), args.Name),
},
}, nil
}
Expand Down Expand Up @@ -239,7 +246,7 @@ func provideConfig() configOut {
configs := []config.ExportedConfig{{
Owner: "queue",
Data: map[string]interface{}{
"queue": map[string]Configuration{
"queue": map[string]configuration{
"default": {
RedisName: "default",
Parallelism: runtime.NumCPU(),
Expand Down
13 changes: 4 additions & 9 deletions dependency_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package queue

import (
"github.com/DoNewsCode/core/contract"
"github.com/go-kit/kit/log"
)

type providersOption struct {
driver Driver
driverConstructor func(args DriverConstructorArgs) (Driver, error)
driverConstructor func(args DriverArgs) (Driver, error)
}

// ProvidersOptionFunc is the type of functional providersOption for Providers. Use this type to change how Providers work.
Expand All @@ -24,18 +23,14 @@ func WithDriver(driver Driver) ProvidersOptionFunc {

// WithDriverConstructor instructs the Providers to accept an alternative constructor for queue driver.
// If the WithDriver option is set, this option becomes an no-op.
func WithDriverConstructor(f func(args DriverConstructorArgs) (Driver, error)) ProvidersOptionFunc {
func WithDriverConstructor(f func(args DriverArgs) (Driver, error)) ProvidersOptionFunc {
return func(options *providersOption) {
options.driverConstructor = f
}
}

// DriverConstructorArgs are arguments to construct the driver. See WithDriverConstructor.
type DriverConstructorArgs struct {
// DriverArgs are arguments to construct the driver. See WithDriverConstructor.
type DriverArgs struct {
Name string
Conf Configuration
Logger log.Logger
AppName contract.AppName
Env contract.Env
Populator contract.DIPopulator
}
28 changes: 20 additions & 8 deletions dependency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/DoNewsCode/core/config"
"github.com/DoNewsCode/core/contract"
"github.com/DoNewsCode/core/di"
"github.com/DoNewsCode/core/otredis"
"github.com/go-kit/kit/log"
Expand All @@ -23,13 +24,28 @@ func (m maker) Make(name string) (redis.UniversalClient, error) {
type populator struct{}

func (p populator) Populate(target interface{}) error {
*(target.(*otredis.Maker)) = maker{}
return nil
g := di.NewGraph()
g.Provide(func() contract.AppName {
return config.AppName("test")
})
g.Provide(func() contract.Env {
return config.Env("test")
})
g.Provide(func() log.Logger {
return log.NewNopLogger()
})
g.Provide(func() otredis.Maker {
return maker{}
})
g.Provide(func() contract.ConfigUnmarshaler {
return config.MapAdapter{"queue": map[string]interface{}{"default": map[string]interface{}{"redisName": "default"}}}
})
return di.IntoPopulator(g).Populate(target)
}

func TestProvideDispatcher(t *testing.T) {
out, err := provideDispatcherFactory(&providersOption{})(makerIn{
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]Configuration{
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]configuration{
"default": {
"default",
1,
Expand All @@ -44,8 +60,6 @@ func TestProvideDispatcher(t *testing.T) {
JobDispatcher: &SyncDispatcher{},
Populator: populator{},
Logger: log.NewNopLogger(),
AppName: config.AppName("test"),
Env: config.EnvTesting,
})
assert.NoError(t, err)
assert.NotNil(t, out.DispatcherFactory)
Expand Down Expand Up @@ -92,7 +106,7 @@ func (m mockDriver) Retry(ctx context.Context, message *PersistedJob) error {

func TestProvideDispatcher_withDriver(t *testing.T) {
out, err := provideDispatcherFactory(&providersOption{driver: mockDriver{}})(makerIn{
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]Configuration{
Conf: config.WithAccessor(config.MapAdapter{"queue": map[string]configuration{
"default": {
"default",
1,
Expand All @@ -106,8 +120,6 @@ func TestProvideDispatcher_withDriver(t *testing.T) {
}}),
JobDispatcher: &SyncDispatcher{},
Logger: log.NewNopLogger(),
AppName: config.AppName("test"),
Env: config.EnvTesting,
})
assert.NoError(t, err)
assert.NotNil(t, out.DispatcherFactory)
Expand Down
4 changes: 2 additions & 2 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
// Decorate(s *PersistedJob)
// }
//
// The PersistentJob passed to the Decorate method contains the tunable Configuration such as maximum retries.
// The PersistentJob passed to the Decorate method contains the tunable configuration such as maximum retries.
//
// No matter how you create a persisted Job, to fire it, send it though a dispatcher. The normal dispatcher in the
// Jobs package won't work, as a queue implementation is required. Luckily, it is deadly simple to convert a standard
Expand All @@ -61,7 +61,7 @@
//
// Integrate
//
// The queue package exports Configuration in this format:
// The queue package exports configuration in this format:
//
// queue:
// default:
Expand Down

0 comments on commit a834c17

Please sign in to comment.