Skip to content

Commit

Permalink
[#48]: feature: priority queue v2 API
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Jun 21, 2023
2 parents 6a76c60 + 7c3931b commit 44fae05
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 43 deletions.
15 changes: 15 additions & 0 deletions plugins/v1/lock/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package lock

import pq "github.com/roadrunner-server/api/v4/plugins/v2/priority_queue"

// Queue represents Lock plugin queue with it's elements types inside
type Queue interface {
// Remove removes element with provided ID (if exists) and returns that elements
Remove(id string) []pq.Item
// Insert adds an item to the queue
Insert(item pq.Item)
// ExtractMin returns the item with the highest priority (less value is the highest priority)
ExtractMin() pq.Item
// Len returns the number of items in the queue
Len() uint64
}
4 changes: 3 additions & 1 deletion plugins/v2/jobs/driver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package jobs

import "context"
import (
"context"
)

// Driver represents the interface for a single jobs driver
type Driver interface {
Expand Down
38 changes: 36 additions & 2 deletions plugins/v2/jobs/job.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,42 @@
package jobs

import (
pq "github.com/roadrunner-server/api/v4/plugins/v2/priority_queue"
)

// Queue represents JOBS plugin queue with it's elements types inside
type Queue interface {
// Remove removes element with provided ID (if exists) and returns that elements
Remove(id string) []Job
// Insert adds an item to the queue
Insert(item Job)
// ExtractMin returns the item with the highest priority (less value is the highest priority)
ExtractMin() Job
// Len returns the number of items in the queue
Len() uint64
}

// Job represents a binary heap item
type Job interface {
pq.Item
// Ack acknowledges the item after processing
Ack() error
// Nack discards the item
Nack() error
// Requeue puts the message back to the queue with an optional delay
Requeue(headers map[string][]string, delay int64) error
// Body returns the payload associated with the item
Body() []byte
// Context returns any meta-information associated with the item
Context() ([]byte, error)
// Headers returns the metadata for the item
Headers() map[string][]string
}

// Message represents the protobuf message received from the RPC call
type Message interface {
Base
pq.Item
KafkaOptions

// Name returns the name of the Job
Name() string
// Payload returns the data associated with the job
Expand All @@ -16,6 +48,8 @@ type Message interface {
// UpdatePriority sets the priority of the Job. Priority is optional but cannot be set to 0.
// The default priority is 10
UpdatePriority(int64)
// Headers returns the metadata for the item
Headers() map[string][]string
}

// KAFKA options (leave them empty for other drivers)
Expand Down
40 changes: 0 additions & 40 deletions plugins/v2/jobs/priority_queue.go

This file was deleted.

11 changes: 11 additions & 0 deletions plugins/v2/priority_queue/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package priority_queue

// Item interface represents the base meta-information which any priority queue message must have
type Item interface {
// ID returns a unique identifier for the item
ID() string
// GroupID returns the group associated with the item, used to remove all items with the same groupID
GroupID() string
// Priority returns the priority level used to sort the item
Priority() int64
}

0 comments on commit 44fae05

Please sign in to comment.