Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: JOBS v2 interface #47

Merged
merged 1 commit into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ require go.uber.org/zap v1.24.0

require (
github.com/stretchr/testify v1.8.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
Expand Down
37 changes: 37 additions & 0 deletions plugins/v2/jobs/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package jobs

import "context"

// Driver represents the interface for a single jobs driver
type Driver interface {
// Push pushes the job to the underlying driver
Push(ctx context.Context, msg Message) error
// Run starts consuming the pipeline
Run(ctx context.Context, pipeline Pipeline) error
// Stop stops the consumer and closes the underlying connection
Stop(ctx context.Context) error
// Pause pauses the jobs consuming (while still allowing job pushing)
Pause(ctx context.Context, pipeline string) error
// Resume resumes the consumer
Resume(ctx context.Context, pipeline string) error
// State returns information about the driver state
State(ctx context.Context) (*State, error)
}

// Commander provides the ability to send a command to the Jobs plugin
type Commander interface {
// Command returns the command name
Command() Command
// Pipeline returns the associated command pipeline
Pipeline() string
}

// Constructor constructs Consumer interface. Endure abstraction.
type Constructor interface {
// Name returns the name of the driver
Name() string
// DriverFromConfig constructs a driver (e.g. kafka, amqp) from the configuration using the provided configKey
DriverFromConfig(configKey string, queue Queue, pipeline Pipeline, cmder chan<- Commander) (Driver, error)
// DriverFromPipeline constructs a driver (e.g. kafka, amqp) from the pipeline. All configuration is provided by the pipeline
DriverFromPipeline(pipe Pipeline, queue Queue, cmder chan<- Commander) (Driver, error)
}
55 changes: 55 additions & 0 deletions plugins/v2/jobs/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package jobs

// Message represents the protobuf message received from the RPC call
type Message interface {
Base
KafkaOptions

// Name returns the name of the Job
Name() string
// Payload returns the data associated with the job
Payload() string
// Delay returns the delay time for the Job (not supported by all drivers)
Delay() int64
// AutoAck returns the autocommit status for the Job
AutoAck() bool
// UpdatePriority sets the priority of the Job. Priority is optional but cannot be set to 0.
// The default priority is 10
UpdatePriority(int64)
}

// KAFKA options (leave them empty for other drivers)
type KafkaOptions interface {
// Offset returns the offset associated with the Job
Offset() int64
// Partition returns the partition associated with the Job
Partition() int32
// Topic returns the topic associated with the Job
Topic() string
// Metadata returns the metadata associated with the Job
Metadata() string
}

type Pipeline interface {
// With sets a pipeline value
With(name string, value interface{})
// Name returns the pipeline name.
Name() string
// Driver returns the driver associated with the pipeline.
Driver() string
// Has checks if a value is present in the pipeline.
Has(name string) bool
// String returns the value of an option as a string or the default value.
String(name string, d string) string
// Int returns the value of an option as an int or the default value.
Int(name string, d int) int
// Bool returns the value of an option as a bool or the default value.
Bool(name string, d bool) bool
// Map returns the nested map value or an empty config.
// This might be used for SQS attributes or tags, for example
Map(name string, out map[string]string) error
// Priority returns the default pipeline priority
Priority() int64
// Get is used to retrieve the data associated with a key
Get(key string) interface{}
}
40 changes: 40 additions & 0 deletions plugins/v2/jobs/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package jobs

// Queue is a binary heap interface
type Queue interface {
// Remove removes element with provided ID (if exists) and returns that elements
Remove(id string) []Job
// Insert adds an item to the queue
Insert(item Job)
// ExtractMin returns the item with the highest priority (less value is the highest priority)
ExtractMin() Job
// Len returns the number of items in the queue
Len() uint64
}

// Job represents a binary heap item
type Job interface {
Base
// Ack acknowledges the item after processing
Ack() error
// Nack discards the item
Nack() error
// Requeue puts the message back to the queue with an optional delay
Requeue(headers map[string][]string, delay int64) error
// Body returns the payload associated with the item
Body() []byte
// Context returns any meta-information associated with the item
Context() ([]byte, error)
}

// Base interface represents the base meta-information which any message must have
type Base interface {
// ID returns a unique identifier for the item
ID() string
// PipelineID returns the pipeline ID associated with the item
PipelineID() string
// Priority returns the priority level used to sort the item
Priority() int64
// Headers returns the metadata for the item
Headers() map[string][]string
}
40 changes: 40 additions & 0 deletions plugins/v2/jobs/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package jobs

// constant keys to pack/unpack messages from different drivers
const (
RRID string = "rr_id"
RRJob string = "rr_job"
RRHeaders string = "rr_headers"
RRPipeline string = "rr_pipeline"
RRDelay string = "rr_delay"
RRPriority string = "rr_priority"
RRAutoAck string = "rr_auto_ack"
)

type Command string

const (
Stop Command = "stop"
)

// State represents job's state
type State struct {
// Pipeline name
Pipeline string
// Driver name
Driver string
// Queue name (tube for the beanstalk)
Queue string
// Active jobs which are consumed from the driver but not handled by the PHP worker yet
Active int64
// Delayed jobs
Delayed int64
// Reserved jobs which are in the driver but not consumed yet
Reserved int64
// Status - 1 Ready, 0 - Paused
Ready bool
// New in 2.10.5, pipeline priority
Priority uint64
// ErrorMessage New in 2023.1
ErrorMessage string
}