Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: piped redis broker and results #44

Merged
merged 2 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 61 additions & 16 deletions brokers/redis/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,28 @@ type Options struct {
IdleTimeout time.Duration
MinIdleConns int
PollPeriod time.Duration

// OPTIONAL
// If non-zero, enqueue redis commands will be piped instead of being directly sent each time.
// The pipe will be executed every `PipePeriod` duration.
PipePeriod time.Duration
}

type Broker struct {
log *slog.Logger
conn redis.UniversalClient
pollPeriod time.Duration
lo *slog.Logger
opts Options

conn redis.UniversalClient
pipe redis.Pipeliner
}

func New(o Options, lo *slog.Logger) *Broker {
pollPeriod := o.PollPeriod
if o.PollPeriod == 0 {
pollPeriod = DefaultPollPeriod
o.PollPeriod = DefaultPollPeriod
}
return &Broker{
log: lo,
b := &Broker{
opts: o,
lo: lo,
conn: redis.NewUniversalClient(&redis.UniversalOptions{
Addrs: o.Addrs,
DB: o.DB,
Expand All @@ -51,7 +58,36 @@ func New(o Options, lo *slog.Logger) *Broker {
MinIdleConns: o.MinIdleConns,
IdleTimeout: o.IdleTimeout,
}),
pollPeriod: pollPeriod,
}

if o.PipePeriod != 0 {
b.pipe = b.conn.Pipeline()
go b.execPipe(context.TODO())
}

return b
}

func (r *Broker) execPipe(ctx context.Context) {
tk := time.NewTicker(r.opts.PipePeriod)
for {
select {
case <-ctx.Done():
r.lo.Debug("context closed, draining redis pipe", "length", r.pipe.Len())
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("could not execute redis pipe", "error", err)
}
return
case <-tk.C:
plen := r.pipe.Len()
if plen == 0 {
continue
}
r.lo.Debug("submitting redis pipe", "length", plen)
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("could not execute redis pipe", "error", err)
}
}
}
}

Expand All @@ -67,10 +103,19 @@ func (r *Broker) GetPending(ctx context.Context, queue string) ([]string, error)
}

func (b *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error {
if b.opts.PipePeriod != 0 {
return b.pipe.LPush(ctx, queue, msg).Err()
}
return b.conn.LPush(ctx, queue, msg).Err()
}

func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error {
if b.opts.PipePeriod != 0 {
return b.pipe.ZAdd(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.Z{
Score: float64(ts.UnixNano()),
Member: msg,
}).Err()
}
return b.conn.ZAdd(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.Z{
Score: float64(ts.UnixNano()),
Member: msg,
Expand All @@ -83,19 +128,19 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
for {
select {
case <-ctx.Done():
b.log.Debug("shutting down consumer..")
b.lo.Debug("shutting down consumer..")
return
default:
b.log.Debug("receiving from consumer..")
res, err := b.conn.BLPop(ctx, b.pollPeriod, queue).Result()
b.lo.Debug("receiving from consumer..")
res, err := b.conn.BLPop(ctx, b.opts.PollPeriod, queue).Result()
if err != nil && err.Error() != "redis: nil" {
b.log.Error("error consuming from redis queue", "error", err)
b.lo.Error("error consuming from redis queue", "error", err)
} else if errors.Is(err, redis.Nil) {
b.log.Debug("no tasks to consume..", "queue", queue)
b.lo.Debug("no tasks to consume..", "queue", queue)
} else {
msg, err := blpopResult(res)
if err != nil {
b.log.Error("error parsing response from redis", "error", err)
b.lo.Error("error parsing response from redis", "error", err)
return
}
work <- []byte(msg)
Expand All @@ -105,12 +150,12 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
}

func (b *Broker) consumeScheduled(ctx context.Context, queue string) {
poll := time.NewTicker(b.pollPeriod)
poll := time.NewTicker(b.opts.PollPeriod)

for {
select {
case <-ctx.Done():
b.log.Debug("shutting down scheduled consumer..")
b.lo.Debug("shutting down scheduled consumer..")
return
case <-poll.C:
b.conn.Watch(ctx, func(tx *redis.Tx) error {
Expand Down
81 changes: 67 additions & 14 deletions results/redis/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ const (
)

type Results struct {
opt Options
opts Options
lo *slog.Logger
conn redis.UniversalClient
pipe redis.Pipeliner
}

type Options struct {
Expand All @@ -34,6 +35,11 @@ type Options struct {
Expiry time.Duration
MetaExpiry time.Duration
MinIdleConns int

// OPTIONAL
// If non-zero, enqueue redis commands will be piped instead of being directly sent each time.
// The pipe will be executed every `PipePeriod` duration.
PipePeriod time.Duration
}

func DefaultRedis() Options {
Expand All @@ -46,7 +52,7 @@ func DefaultRedis() Options {

func New(o Options, lo *slog.Logger) *Results {
rs := &Results{
opt: o,
opts: o,
conn: redis.NewUniversalClient(
&redis.UniversalOptions{
Addrs: o.Addrs,
Expand All @@ -66,26 +72,50 @@ func New(o Options, lo *slog.Logger) *Results {
if o.MetaExpiry != 0 {
go rs.expireMeta(o.MetaExpiry)
}
if o.PipePeriod != 0 {
rs.pipe = rs.conn.Pipeline()
go rs.execPipe(context.TODO())
}

return rs
}

func (r *Results) execPipe(ctx context.Context) {
tk := time.NewTicker(r.opts.PipePeriod)
for {
select {
case <-ctx.Done():
r.lo.Debug("context closed, draining redis pipe", "length", r.pipe.Len())
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("could not execute redis pipe", "error", err)
}
return
case <-tk.C:
plen := r.pipe.Len()
if plen == 0 {
continue
}
r.lo.Debug("submitting redis pipe", "length", plen)
if _, err := r.pipe.Exec(ctx); err != nil {
r.lo.Error("could not execute redis pipe", "error", err)
}
}
}
}

func (r *Results) DeleteJob(ctx context.Context, id string) error {
r.lo.Debug("deleting job")

pipe := r.conn.Pipeline()
if err := pipe.ZRem(ctx, resultPrefix+success, 1, id).Err(); err != nil {
return err
}

if err := pipe.ZRem(ctx, resultPrefix+failed, 1, id).Err(); err != nil {
return err
}

if err := pipe.Del(ctx, resultPrefix+id).Err(); err != nil {
return err
}

if _, err := pipe.Exec(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -123,6 +153,12 @@ func (r *Results) GetFailed(ctx context.Context) ([]string, error) {

func (r *Results) SetSuccess(ctx context.Context, id string) error {
r.lo.Debug("setting job as successful", "id", id)
if r.opts.PipePeriod != 0 {
return r.pipe.ZAdd(ctx, resultPrefix+success, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
}).Err()
}
return r.conn.ZAdd(ctx, resultPrefix+success, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
Expand All @@ -131,6 +167,12 @@ func (r *Results) SetSuccess(ctx context.Context, id string) error {

func (r *Results) SetFailed(ctx context.Context, id string) error {
r.lo.Debug("setting job as failed", "id", id)
if r.opts.PipePeriod != 0 {
return r.pipe.ZAdd(ctx, resultPrefix+failed, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
}).Err()
}
return r.conn.ZAdd(ctx, resultPrefix+failed, &redis.Z{
Score: float64(time.Now().UnixNano()),
Member: id,
Expand All @@ -139,12 +181,14 @@ func (r *Results) SetFailed(ctx context.Context, id string) error {

func (r *Results) Set(ctx context.Context, id string, b []byte) error {
r.lo.Debug("setting result for job", "id", id)
return r.conn.Set(ctx, resultPrefix+id, b, r.opt.Expiry).Err()
if r.opts.PipePeriod != 0 {
return r.pipe.Set(ctx, resultPrefix+id, b, r.opts.Expiry).Err()
}
return r.conn.Set(ctx, resultPrefix+id, b, r.opts.Expiry).Err()
}

func (r *Results) Get(ctx context.Context, id string) ([]byte, error) {
r.lo.Debug("getting result for job", "id", id)

rs, err := r.conn.Get(ctx, resultPrefix+id).Bytes()
if err != nil {
return nil, err
Expand All @@ -171,13 +215,22 @@ func (r *Results) expireMeta(ttl time.Duration) {
score := strconv.FormatInt(now, 10)

r.lo.Debug("purging failed results metadata", "score", score)
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}

r.lo.Debug("purging success results metadata", "score", score)
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
if r.opts.PipePeriod != 0 {
if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
r.lo.Debug("purging success results metadata", "score", score)
if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
} else {
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
r.lo.Debug("purging success results metadata", "score", score)
if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil {
r.lo.Error("could not expire success/failed metadata", "err", err)
}
}
}
}
Expand Down
Loading