Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: update to new JOBS API #77

Merged
merged 6 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
go-version: stable

- name: Run linter
uses: golangci/golangci-lint-action@v3.5.0 # Action page: <https://github.com/golangci/golangci-lint-action>
uses: golangci/golangci-lint-action@v3.6.0 # Action page: <https://github.com/golangci/golangci-lint-action>
with:
version: v1.53 # without patch version
only-new-issues: false # show only new issues if it's a pull request
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
strategy:
fail-fast: true
matrix:
go: [ "1.20" ]
go: [ stable ]
os: [ "ubuntu-latest" ]
steps:
- name: Set up Go ${{ matrix.go }}
Expand Down
21 changes: 10 additions & 11 deletions amqpjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import (
"github.com/goccy/go-json"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
pq "github.com/roadrunner-server/api/v4/plugins/v1/priority_queue"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/errors"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
"go.opentelemetry.io/otel"
Expand All @@ -41,7 +40,7 @@ type Configurer interface {
type Driver struct {
mu sync.Mutex
log *zap.Logger
pq pq.Queue
pq jobs.Queue
pipeline atomic.Pointer[jobs.Pipeline]
consumeAll bool
tracer *sdktrace.TracerProvider
Expand Down Expand Up @@ -85,11 +84,11 @@ type Driver struct {
listeners uint32
delayed *int64
stopCh chan struct{}
stopped uint32
stopped uint64
}

// FromConfig initializes rabbitmq pipeline
func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipeline jobs.Pipeline, pq pq.Queue) (*Driver, error) {
func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipeline jobs.Pipeline, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_amqp_consumer")

if tracer == nil {
Expand Down Expand Up @@ -209,7 +208,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
}

// FromPipeline initializes consumer from pipeline
func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *zap.Logger, cfg Configurer, pq pq.Queue) (*Driver, error) {
func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_amqp_consumer_from_pipeline")
if tracer == nil {
tracer = sdktrace.NewTracerProvider()
Expand Down Expand Up @@ -340,7 +339,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *
return jb, nil
}

func (d *Driver) Push(ctx context.Context, job jobs.Job) error {
func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
const op = errors.Op("rabbitmq_push")
// check if the pipeline registered

Expand All @@ -353,8 +352,8 @@ func (d *Driver) Push(ctx context.Context, job jobs.Job) error {

// load atomic value
pipe := *d.pipeline.Load()
if pipe.Name() != job.Pipeline() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.Pipeline(), pipe.Name()))
if pipe.Name() != job.GroupID() {
return errors.E(op, errors.Errorf("no such pipeline: %s, actual: %s", job.GroupID(), pipe.Name()))
}

err := d.handleItem(ctx, fromJob(job))
Expand Down Expand Up @@ -584,7 +583,7 @@ func (d *Driver) Stop(ctx context.Context) error {
_, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "amqp_stop")
defer span.End()

atomic.StoreUint32(&d.stopped, 1)
atomic.StoreUint64(&d.stopped, 1)
d.stopCh <- struct{}{}

pipe := *d.pipeline.Load()
Expand All @@ -603,7 +602,7 @@ func (d *Driver) handleItem(ctx context.Context, msg *Item) error {
d.publishChan <- pch
}()

d.prop.Inject(ctx, propagation.HeaderCarrier(msg.Headers))
d.prop.Inject(ctx, propagation.HeaderCarrier(msg.headers))

// convert
table, err := pack(msg.ID(), msg)
Expand Down
45 changes: 31 additions & 14 deletions amqpjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"github.com/goccy/go-json"
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/roadrunner-server/api/v4/plugins/v1/jobs"
"github.com/roadrunner-server/api/v4/plugins/v2/jobs"
"github.com/roadrunner-server/errors"
"go.uber.org/zap"
)

var _ jobs.Acknowledger = (*Item)(nil)
var _ jobs.Job = (*Item)(nil)

const (
auto string = "deduced_by_rr"
Expand All @@ -29,7 +29,7 @@ type Item struct {
// Payload is string data (usually JSON) passed to Job broker.
Payload string `json:"payload"`
// Headers with key-values pairs
Headers map[string][]string `json:"headers"`
headers map[string][]string
// Options contains set of PipelineOptions specific to job execution. Can be empty.
Options *Options `json:"options,omitempty"`
}
Expand All @@ -49,6 +49,7 @@ type Options struct {
Queue string `json:"queue,omitempty"`

// private
stopped *uint64
// ack delegates an acknowledgement through the Acknowledger interface that the client or server has finished work on a delivery
ack func(multiply bool) error

Expand Down Expand Up @@ -76,6 +77,10 @@ func (i *Item) ID() string {
return i.Ident
}

func (i *Item) GroupID() string {
return i.Options.Pipeline
}

func (i *Item) Priority() int64 {
return i.Options.Priority
}
Expand All @@ -85,8 +90,8 @@ func (i *Item) Body() []byte {
return strToBytes(i.Payload)
}

func (i *Item) Metadata() map[string][]string {
return i.Headers
func (i *Item) Headers() map[string][]string {
return i.headers
}

// Context packs job context (job, id) into binary payload.
Expand All @@ -104,7 +109,7 @@ func (i *Item) Context() ([]byte, error) {
ID: i.Ident,
Job: i.Job,
Driver: pluginName,
Headers: i.Headers,
Headers: i.headers,
Queue: i.Options.Queue,
Pipeline: i.Options.Pipeline,
},
Expand All @@ -118,13 +123,19 @@ func (i *Item) Context() ([]byte, error) {
}

func (i *Item) Ack() error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}
if i.Options.Delay > 0 {
atomic.AddInt64(i.Options.delayed, ^int64(0))
}
return i.Options.ack(i.Options.multipleAck)
}

func (i *Item) Nack() error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}
if i.Options.Delay > 0 {
atomic.AddInt64(i.Options.delayed, ^int64(0))
}
Expand All @@ -133,12 +144,16 @@ func (i *Item) Nack() error {

// Requeue with the provided delay, handled by the Nack
func (i *Item) Requeue(headers map[string][]string, delay int64) error {
if atomic.LoadUint64(i.Options.stopped) == 1 {
return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped")
}
if i.Options.Delay > 0 {
atomic.AddInt64(i.Options.delayed, ^int64(0))
}

// overwrite the delay
i.Options.Delay = delay
i.Headers = headers
i.headers = headers

err := i.Options.requeueFn(context.Background(), i)
if err != nil {
Expand Down Expand Up @@ -184,13 +199,14 @@ func (d *Driver) fromDelivery(deliv amqp.Delivery) (*Item, error) {
Job: auto,
Ident: id,
Payload: bytesToStr(deliv.Body),
Headers: convHeaders(deliv.Headers),
headers: convHeaders(deliv.Headers),
Options: &Options{
Priority: 10,
Queue: d.queue,
// in case of `deduced_by_rr` type of the JOB, we're sending a queue name
Pipeline: (*d.pipeline.Load()).Name(),
AutoAck: false,
stopped: &d.stopped,
ack: deliv.Ack,
nack: deliv.Nack,
requeueFn: d.handleItem,
Expand All @@ -208,7 +224,7 @@ func (d *Driver) fromDelivery(deliv amqp.Delivery) (*Item, error) {
Job: item.Job,
Ident: item.Ident,
Payload: item.Payload,
Headers: item.Headers,
headers: item.headers,
Options: item.Options,
}

Expand All @@ -229,21 +245,22 @@ func (d *Driver) fromDelivery(deliv amqp.Delivery) (*Item, error) {
item.Options.nack = deliv.Nack
}

item.Options.stopped = &d.stopped
item.Options.delayed = d.delayed
// requeue func
item.Options.requeueFn = d.handleItem
return i, nil
}

func fromJob(job jobs.Job) *Item {
func fromJob(job jobs.Message) *Item {
return &Item{
Job: job.Name(),
Ident: job.ID(),
Payload: job.Payload(),
Headers: job.Headers(),
headers: job.Headers(),
Options: &Options{
Priority: job.Priority(),
Pipeline: job.Pipeline(),
Pipeline: job.GroupID(),
Delay: job.Delay(),
AutoAck: job.AutoAck(),
},
Expand All @@ -252,7 +269,7 @@ func fromJob(job jobs.Job) *Item {

// pack job metadata into headers
func pack(id string, j *Item) (amqp.Table, error) {
h, err := json.Marshal(j.Headers)
h, err := json.Marshal(j.headers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -296,7 +313,7 @@ func (d *Driver) unpack(deliv amqp.Delivery) (*Item, error) {
}

if h, ok := deliv.Headers[jobs.RRHeaders].([]byte); ok {
err := json.Unmarshal(h, &item.Headers)
err := json.Unmarshal(h, &item.headers)
if err != nil {
return nil, errors.E(err, errors.Decode)
}
Expand Down
10 changes: 5 additions & 5 deletions amqpjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (d *Driver) listener(deliv <-chan amqp.Delivery) {
for msg := range deliv {
del, err := d.fromDelivery(msg)

ctx := otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(del.Headers))
ctx := otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(del.headers))
ctx, span := d.tracer.Tracer(tracerName).Start(ctx, "amqp_listener")

if err != nil {
Expand All @@ -35,7 +35,7 @@ func (d *Driver) listener(deliv <-chan amqp.Delivery) {
}

if d != nil {
del.Headers = nil
del.headers = nil
del.Options = nil
}
continue
Expand All @@ -46,11 +46,11 @@ func (d *Driver) listener(deliv <-chan amqp.Delivery) {
_ = msg.Ack(false)
}

if del.Headers == nil {
del.Headers = make(map[string][]string, 2)
if del.headers == nil {
del.headers = make(map[string][]string, 2)
}

d.prop.Inject(ctx, propagation.HeaderCarrier(del.Headers))
d.prop.Inject(ctx, propagation.HeaderCarrier(del.headers))
// insert job into the main priority queue
d.pq.Insert(del)
span.End()
Expand Down
29 changes: 22 additions & 7 deletions amqpjobs/redial.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (d *Driver) redialer() { //nolint:gocognit,gocyclo
}

// stopped
if atomic.LoadUint32(&d.stopped) == 1 {
if atomic.LoadUint64(&d.stopped) == 1 {
d.log.Debug("redialer stopped")
continue
}
Expand All @@ -58,7 +58,7 @@ func (d *Driver) redialer() { //nolint:gocognit,gocyclo
}

// stopped
if atomic.LoadUint32(&d.stopped) == 1 {
if atomic.LoadUint64(&d.stopped) == 1 {
d.log.Debug("redialer stopped")
continue
}
Expand All @@ -82,7 +82,7 @@ func (d *Driver) redialer() { //nolint:gocognit,gocyclo
}

// stopped
if atomic.LoadUint32(&d.stopped) == 1 {
if atomic.LoadUint64(&d.stopped) == 1 {
d.log.Debug("redialer stopped")
continue
}
Expand All @@ -106,7 +106,7 @@ func (d *Driver) redialer() { //nolint:gocognit,gocyclo
}

// stopped
if atomic.LoadUint32(&d.stopped) == 1 {
if atomic.LoadUint64(&d.stopped) == 1 {
d.log.Debug("redialer stopped")
continue
}
Expand All @@ -129,15 +129,30 @@ func (d *Driver) redialer() { //nolint:gocognit,gocyclo
pch := <-d.publishChan
stCh := <-d.stateChan

// cancel new deliviries
err := pch.Cancel(d.consumeID, false)
if err != nil {
d.log.Error("consumer cancel", zap.Error(err), zap.String("consumerID", d.consumeID))
}

// wait for the listener to stop
for atomic.CompareAndSwapUint32(&d.listeners, 1, 0) {
time.Sleep(time.Millisecond)
}

// remove the items associated with that pipeline from the priority_queue
_ = d.pq.Remove((*d.pipeline.Load()).Name())

if d.deleteQueueOnStop {
msg, err := pch.QueueDelete(d.queue, false, false, false)
var n int
n, err = pch.QueueDelete(d.queue, false, false, false)
if err != nil {
d.log.Error("queue delete", zap.Error(err))
}
d.log.Debug("number of purged messages", zap.Int("count", msg))
d.log.Debug("number of purged messages", zap.Int("count", n))
}

err := pch.Close()
err = pch.Close()
if err != nil {
d.log.Error("publish channel close", zap.Error(err))
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/rabbitmq/amqp091-go v1.8.1
github.com/roadrunner-server/api/v4 v4.3.2
github.com/roadrunner-server/api/v4 v4.5.0
github.com/roadrunner-server/endure/v2 v2.2.1
github.com/roadrunner-server/errors v1.2.0
github.com/stretchr/testify v1.8.4
Expand All @@ -16,7 +16,7 @@ require (
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.24.0
golang.org/x/sys v0.8.0
golang.org/x/sys v0.9.0
)

require (
Expand Down
Loading