diff --git a/brokers/redis/broker.go b/brokers/redis/broker.go index 2bf3c69..b7c4d99 100644 --- a/brokers/redis/broker.go +++ b/brokers/redis/broker.go @@ -26,21 +26,28 @@ type Options struct { IdleTimeout time.Duration MinIdleConns int PollPeriod time.Duration + + // OPTIONAL + // If non-zero, enqueue redis commands will be piped instead of being directly sent each time. + // The pipe will be executed every `PipePeriod` duration. + PipePeriod time.Duration } type Broker struct { - log *slog.Logger - conn redis.UniversalClient - pollPeriod time.Duration + lo *slog.Logger + opts Options + + conn redis.UniversalClient + pipe redis.Pipeliner } func New(o Options, lo *slog.Logger) *Broker { - pollPeriod := o.PollPeriod if o.PollPeriod == 0 { - pollPeriod = DefaultPollPeriod + o.PollPeriod = DefaultPollPeriod } - return &Broker{ - log: lo, + b := &Broker{ + opts: o, + lo: lo, conn: redis.NewUniversalClient(&redis.UniversalOptions{ Addrs: o.Addrs, DB: o.DB, @@ -51,7 +58,36 @@ func New(o Options, lo *slog.Logger) *Broker { MinIdleConns: o.MinIdleConns, IdleTimeout: o.IdleTimeout, }), - pollPeriod: pollPeriod, + } + + if o.PipePeriod != 0 { + b.pipe = b.conn.Pipeline() + go b.execPipe(context.TODO()) + } + + return b +} + +func (r *Broker) execPipe(ctx context.Context) { + tk := time.NewTicker(r.opts.PipePeriod) + for { + select { + case <-ctx.Done(): + r.lo.Debug("context closed, draining redis pipe", "length", r.pipe.Len()) + if _, err := r.pipe.Exec(ctx); err != nil { + r.lo.Error("could not execute redis pipe", "error", err) + } + return + case <-tk.C: + plen := r.pipe.Len() + if plen == 0 { + continue + } + r.lo.Debug("submitting redis pipe", "length", plen) + if _, err := r.pipe.Exec(ctx); err != nil { + r.lo.Error("could not execute redis pipe", "error", err) + } + } } } @@ -67,10 +103,19 @@ func (r *Broker) GetPending(ctx context.Context, queue string) ([]string, error) } func (b *Broker) Enqueue(ctx context.Context, msg []byte, queue string) error { + if b.opts.PipePeriod != 0 { + return b.pipe.LPush(ctx, queue, msg).Err() + } return b.conn.LPush(ctx, queue, msg).Err() } func (b *Broker) EnqueueScheduled(ctx context.Context, msg []byte, queue string, ts time.Time) error { + if b.opts.PipePeriod != 0 { + return b.pipe.ZAdd(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.Z{ + Score: float64(ts.UnixNano()), + Member: msg, + }).Err() + } return b.conn.ZAdd(ctx, fmt.Sprintf(sortedSetKey, queue), &redis.Z{ Score: float64(ts.UnixNano()), Member: msg, @@ -83,19 +128,19 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) { for { select { case <-ctx.Done(): - b.log.Debug("shutting down consumer..") + b.lo.Debug("shutting down consumer..") return default: - b.log.Debug("receiving from consumer..") - res, err := b.conn.BLPop(ctx, b.pollPeriod, queue).Result() + b.lo.Debug("receiving from consumer..") + res, err := b.conn.BLPop(ctx, b.opts.PollPeriod, queue).Result() if err != nil && err.Error() != "redis: nil" { - b.log.Error("error consuming from redis queue", "error", err) + b.lo.Error("error consuming from redis queue", "error", err) } else if errors.Is(err, redis.Nil) { - b.log.Debug("no tasks to consume..", "queue", queue) + b.lo.Debug("no tasks to consume..", "queue", queue) } else { msg, err := blpopResult(res) if err != nil { - b.log.Error("error parsing response from redis", "error", err) + b.lo.Error("error parsing response from redis", "error", err) return } work <- []byte(msg) @@ -105,12 +150,12 @@ func (b *Broker) Consume(ctx context.Context, work chan []byte, queue string) { } func (b *Broker) consumeScheduled(ctx context.Context, queue string) { - poll := time.NewTicker(b.pollPeriod) + poll := time.NewTicker(b.opts.PollPeriod) for { select { case <-ctx.Done(): - b.log.Debug("shutting down scheduled consumer..") + b.lo.Debug("shutting down scheduled consumer..") return case <-poll.C: b.conn.Watch(ctx, func(tx *redis.Tx) error { diff --git a/results/redis/results.go b/results/redis/results.go index 968e6df..f7af666 100644 --- a/results/redis/results.go +++ b/results/redis/results.go @@ -18,9 +18,10 @@ const ( ) type Results struct { - opt Options + opts Options lo *slog.Logger conn redis.UniversalClient + pipe redis.Pipeliner } type Options struct { @@ -34,6 +35,11 @@ type Options struct { Expiry time.Duration MetaExpiry time.Duration MinIdleConns int + + // OPTIONAL + // If non-zero, enqueue redis commands will be piped instead of being directly sent each time. + // The pipe will be executed every `PipePeriod` duration. + PipePeriod time.Duration } func DefaultRedis() Options { @@ -46,7 +52,7 @@ func DefaultRedis() Options { func New(o Options, lo *slog.Logger) *Results { rs := &Results{ - opt: o, + opts: o, conn: redis.NewUniversalClient( &redis.UniversalOptions{ Addrs: o.Addrs, @@ -66,10 +72,37 @@ func New(o Options, lo *slog.Logger) *Results { if o.MetaExpiry != 0 { go rs.expireMeta(o.MetaExpiry) } + if o.PipePeriod != 0 { + rs.pipe = rs.conn.Pipeline() + go rs.execPipe(context.TODO()) + } return rs } +func (r *Results) execPipe(ctx context.Context) { + tk := time.NewTicker(r.opts.PipePeriod) + for { + select { + case <-ctx.Done(): + r.lo.Debug("context closed, draining redis pipe", "length", r.pipe.Len()) + if _, err := r.pipe.Exec(ctx); err != nil { + r.lo.Error("could not execute redis pipe", "error", err) + } + return + case <-tk.C: + plen := r.pipe.Len() + if plen == 0 { + continue + } + r.lo.Debug("submitting redis pipe", "length", plen) + if _, err := r.pipe.Exec(ctx); err != nil { + r.lo.Error("could not execute redis pipe", "error", err) + } + } + } +} + func (r *Results) DeleteJob(ctx context.Context, id string) error { r.lo.Debug("deleting job") @@ -77,15 +110,12 @@ func (r *Results) DeleteJob(ctx context.Context, id string) error { if err := pipe.ZRem(ctx, resultPrefix+success, 1, id).Err(); err != nil { return err } - if err := pipe.ZRem(ctx, resultPrefix+failed, 1, id).Err(); err != nil { return err } - if err := pipe.Del(ctx, resultPrefix+id).Err(); err != nil { return err } - if _, err := pipe.Exec(ctx); err != nil { return err } @@ -123,6 +153,12 @@ func (r *Results) GetFailed(ctx context.Context) ([]string, error) { func (r *Results) SetSuccess(ctx context.Context, id string) error { r.lo.Debug("setting job as successful", "id", id) + if r.opts.PipePeriod != 0 { + return r.pipe.ZAdd(ctx, resultPrefix+success, &redis.Z{ + Score: float64(time.Now().UnixNano()), + Member: id, + }).Err() + } return r.conn.ZAdd(ctx, resultPrefix+success, &redis.Z{ Score: float64(time.Now().UnixNano()), Member: id, @@ -131,6 +167,12 @@ func (r *Results) SetSuccess(ctx context.Context, id string) error { func (r *Results) SetFailed(ctx context.Context, id string) error { r.lo.Debug("setting job as failed", "id", id) + if r.opts.PipePeriod != 0 { + return r.pipe.ZAdd(ctx, resultPrefix+failed, &redis.Z{ + Score: float64(time.Now().UnixNano()), + Member: id, + }).Err() + } return r.conn.ZAdd(ctx, resultPrefix+failed, &redis.Z{ Score: float64(time.Now().UnixNano()), Member: id, @@ -139,12 +181,14 @@ func (r *Results) SetFailed(ctx context.Context, id string) error { func (r *Results) Set(ctx context.Context, id string, b []byte) error { r.lo.Debug("setting result for job", "id", id) - return r.conn.Set(ctx, resultPrefix+id, b, r.opt.Expiry).Err() + if r.opts.PipePeriod != 0 { + return r.pipe.Set(ctx, resultPrefix+id, b, r.opts.Expiry).Err() + } + return r.conn.Set(ctx, resultPrefix+id, b, r.opts.Expiry).Err() } func (r *Results) Get(ctx context.Context, id string) ([]byte, error) { r.lo.Debug("getting result for job", "id", id) - rs, err := r.conn.Get(ctx, resultPrefix+id).Bytes() if err != nil { return nil, err @@ -171,13 +215,22 @@ func (r *Results) expireMeta(ttl time.Duration) { score := strconv.FormatInt(now, 10) r.lo.Debug("purging failed results metadata", "score", score) - if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil { - r.lo.Error("could not expire success/failed metadata", "err", err) - } - - r.lo.Debug("purging success results metadata", "score", score) - if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil { - r.lo.Error("could not expire success/failed metadata", "err", err) + if r.opts.PipePeriod != 0 { + if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil { + r.lo.Error("could not expire success/failed metadata", "err", err) + } + r.lo.Debug("purging success results metadata", "score", score) + if err := r.pipe.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil { + r.lo.Error("could not expire success/failed metadata", "err", err) + } + } else { + if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+failed, "0", score).Err(); err != nil { + r.lo.Error("could not expire success/failed metadata", "err", err) + } + r.lo.Debug("purging success results metadata", "score", score) + if err := r.conn.ZRemRangeByScore(context.Background(), resultPrefix+success, "0", score).Err(); err != nil { + r.lo.Error("could not expire success/failed metadata", "err", err) + } } } }