From 98430acd80423b776149f29d625d158f490ac3c5 Mon Sep 17 00:00:00 2001 From: Sasha Melentyev Date: Thu, 11 Jan 2024 18:36:58 +0300 Subject: [PATCH] [IMPROVED] Use errors.Is for err handling, and use skipped bool (#1500) Signed-off-by: Sasha Melentyev --- jetstream/kv.go | 10 +++++----- js.go | 20 ++++++++++---------- jsm.go | 6 +++--- kv.go | 12 ++++++------ object.go | 4 ++-- timer.go | 2 +- 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/jetstream/kv.go b/jetstream/kv.go index 44f5b1281..160dd1a6a 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -280,7 +280,7 @@ func (js *jetStream) KeyValue(ctx context.Context, bucket string) (KeyValue, err streamName := fmt.Sprintf(kvBucketNameTmpl, bucket) stream, err := js.Stream(ctx, streamName) if err != nil { - if err == ErrStreamNotFound { + if errors.Is(err, ErrStreamNotFound) { err = ErrBucketNotFound } return nil, err @@ -567,7 +567,7 @@ func (kv *kvs) get(ctx context.Context, key string, revision uint64) (KeyValueEn } } if err != nil { - if err == ErrMsgNotFound { + if errors.Is(err, ErrMsgNotFound) { err = ErrKeyNotFound } return nil, err @@ -619,7 +619,7 @@ func (e *kve) Operation() KeyValueOp { return e.op } func (kv *kvs) Get(ctx context.Context, key string) (KeyValueEntry, error) { e, err := kv.get(ctx, key, kvLatestRevision) if err != nil { - if err == ErrKeyDeleted { + if errors.Is(err, ErrKeyDeleted) { return nil, ErrKeyNotFound } return nil, err @@ -632,7 +632,7 @@ func (kv *kvs) Get(ctx context.Context, key string) (KeyValueEntry, error) { func (kv *kvs) GetRevision(ctx context.Context, key string, revision uint64) (KeyValueEntry, error) { e, err := kv.get(ctx, key, revision) if err != nil { - if err == ErrKeyDeleted { + if errors.Is(err, ErrKeyDeleted) { return nil, ErrKeyNotFound } return nil, err @@ -677,7 +677,7 @@ func (kv *kvs) Create(ctx context.Context, key string, value []byte) (revision u return v, nil } - if e, err := kv.get(ctx, key, kvLatestRevision); err == ErrKeyDeleted { + if e, err := kv.get(ctx, key, kvLatestRevision); errors.Is(err, ErrKeyDeleted) { return kv.Update(ctx, key, value, e.Revision()) } diff --git a/js.go b/js.go index 9b0bd3e07..0c0673058 100644 --- a/js.go +++ b/js.go @@ -545,7 +545,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { } if err != nil { - for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ { + for r, ttl := 0, o.ttl; errors.Is(err, ErrNoResponders) && (r < o.rnum || o.rnum < 0); r++ { // To protect against small blips in leadership changes etc, if we get a no responders here retry. if o.ctx != nil { select { @@ -567,7 +567,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { } } if err != nil { - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrNoStreamResponse } return nil, err @@ -1601,7 +1601,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if consumer != _EMPTY_ && !o.skipCInfo { info, err = js.ConsumerInfo(stream, consumer) notFoundErr = errors.Is(err, ErrConsumerNotFound) - lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded + lookupErr = err == ErrJetStreamNotEnabled || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded) } switch { @@ -2831,7 +2831,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // are no messages. msg, err = sub.nextMsgWithContext(ctx, true, false) if err != nil { - if err == errNoMessages { + if errors.Is(err, errNoMessages) { err = nil } break @@ -2911,13 +2911,13 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { usrMsg, err = checkMsg(msg, true, noWait) if err == nil && usrMsg { msgs = append(msgs, msg) - } else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 { + } else if noWait && (errors.Is(err, errNoMessages) || errors.Is(err, errRequestsPending)) && len(msgs) == 0 { // If we have a 404/408 for our "no_wait" request and have // not collected any message, then resend request to // wait this time. noWait = false err = sendReq() - } else if err == ErrTimeout && len(msgs) == 0 { + } else if errors.Is(err, ErrTimeout) && len(msgs) == 0 { // If we get a 408, we will bail if we already collected some // messages, otherwise ignore and go back calling nextMsg. err = nil @@ -3100,7 +3100,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e // are no messages. msg, err := sub.nextMsgWithContext(ctx, true, false) if err != nil { - if err == errNoMessages { + if errors.Is(err, errNoMessages) { err = nil } result.err = err @@ -3177,7 +3177,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e usrMsg, err = checkMsg(msg, true, false) if err != nil { - if err == ErrTimeout { + if errors.Is(err, ErrTimeout) { if reqID != "" && !subjectMatchesReqID(msg.Subject, reqID) { // ignore timeout message from server if it comes from a different pull request continue @@ -3206,7 +3206,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e // checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout func (o *pullOpts) checkCtxErr(err error) error { - if o.ctx == nil && err == context.DeadlineExceeded { + if o.ctx == nil && errors.Is(err, context.DeadlineExceeded) { return ErrTimeout } return err @@ -3222,7 +3222,7 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer) resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil) if err != nil { - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrJetStreamNotEnabled } return nil, err diff --git a/jsm.go b/jsm.go index d6c2d8ab1..212c7c020 100644 --- a/jsm.go +++ b/jsm.go @@ -297,7 +297,7 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil) if err != nil { // todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrJetStreamNotEnabled } return nil, err @@ -415,7 +415,7 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req) if err != nil { - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrJetStreamNotEnabled } return nil, err @@ -1623,7 +1623,7 @@ func (jsc *js) StreamNameBySubject(subj string, opts ...JSOpt) (string, error) { resp, err := jsc.apiRequestWithContext(o.ctx, jsc.apiSubj(apiStreams), j) if err != nil { - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrJetStreamNotEnabled } return _EMPTY_, err diff --git a/kv.go b/kv.go index 29566a34b..0864f30cc 100644 --- a/kv.go +++ b/kv.go @@ -359,7 +359,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) { stream := fmt.Sprintf(kvBucketNameTmpl, bucket) si, err := js.StreamInfo(stream) if err != nil { - if err == ErrStreamNotFound { + if errors.Is(err, ErrStreamNotFound) { err = ErrBucketNotFound } return nil, err @@ -486,7 +486,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { // the stream. // The same logic applies for KVs created pre 2.9.x and // the AllowDirect setting. - if err == ErrStreamNameAlreadyInUse { + if errors.Is(err, ErrStreamNameAlreadyInUse) { if si, _ = js.StreamInfo(scfg.Name); si != nil { // To compare, make the server's stream info discard // policy same than ours. @@ -558,7 +558,7 @@ func keyValid(key string) bool { func (kv *kvs) Get(key string) (KeyValueEntry, error) { e, err := kv.get(key, kvLatestRevision) if err != nil { - if err == ErrKeyDeleted { + if errors.Is(err, ErrKeyDeleted) { return nil, ErrKeyNotFound } return nil, err @@ -571,7 +571,7 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) { func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) { e, err := kv.get(key, revision) if err != nil { - if err == ErrKeyDeleted { + if errors.Is(err, ErrKeyDeleted) { return nil, ErrKeyNotFound } return nil, err @@ -608,7 +608,7 @@ func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) { } } if err != nil { - if err == ErrMsgNotFound { + if errors.Is(err, ErrMsgNotFound) { err = ErrKeyNotFound } return nil, err @@ -675,7 +675,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { // TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that // so we need to double check. - if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted { + if e, err := kv.get(key, kvLatestRevision); errors.Is(err, ErrKeyDeleted) { return kv.Update(key, value, e.Revision()) } diff --git a/object.go b/object.go index 9d2866bdb..92267918e 100644 --- a/object.go +++ b/object.go @@ -645,7 +645,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) { if ctx != nil { select { case <-ctx.Done(): - if ctx.Err() == context.Canceled { + if errors.Is(ctx.Err(), context.Canceled) { err = ctx.Err() } else { err = ErrTimeout @@ -945,7 +945,7 @@ func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, err m, err := obs.js.GetLastMsg(stream, metaSubj) if err != nil { - if err == ErrMsgNotFound { + if errors.Is(err, ErrMsgNotFound) { err = ErrObjectNotFound } return nil, err diff --git a/timer.go b/timer.go index 4fb02ecb4..6edeb4cf8 100644 --- a/timer.go +++ b/timer.go @@ -29,7 +29,7 @@ type timerPool struct { // Get returns a timer that completes after the given duration. func (tp *timerPool) Get(d time.Duration) *time.Timer { - if t, _ := tp.p.Get().(*time.Timer); t != nil { + if t, ok := tp.p.Get().(*time.Timer); ok && t != nil { t.Reset(d) return t }