Skip to content

Commit

Permalink
feature: jobs v2 interface
Browse files Browse the repository at this point in the history
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
  • Loading branch information
rustatian committed Jun 14, 2023
1 parent 71d4067 commit cf3beeb
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 3 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ require go.uber.org/zap v1.24.0

require (
github.com/stretchr/testify v1.8.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
Expand Down
37 changes: 37 additions & 0 deletions plugins/v2/jobs/driver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package jobs

import "context"

// Driver represents the interface for a single jobs driver
type Driver interface {
// Push pushes the job to the underlying driver
Push(ctx context.Context, msg Message) error
// Run starts consuming the pipeline
Run(ctx context.Context, pipeline Pipeline) error
// Stop stops the consumer and closes the underlying connection
Stop(ctx context.Context) error
// Pause pauses the jobs consuming (while still allowing job pushing)
Pause(ctx context.Context, pipeline string) error
// Resume resumes the consumer
Resume(ctx context.Context, pipeline string) error
// State returns information about the driver state
State(ctx context.Context) (*State, error)
}

// Commander provides the ability to send a command to the Jobs plugin
type Commander interface {
// Command returns the command name
Command() Command
// Pipeline returns the associated command pipeline
Pipeline() string
}

// Constructor constructs Consumer interface. Endure abstraction.
type Constructor interface {
// Name returns the name of the driver
Name() string
// DriverFromConfig constructs a driver (e.g. kafka, amqp) from the configuration using the provided configKey
DriverFromConfig(configKey string, queue Queue, pipeline Pipeline, cmder chan<- Commander) (Driver, error)
// DriverFromPipeline constructs a driver (e.g. kafka, amqp) from the pipeline. All configuration is provided by the pipeline
DriverFromPipeline(pipe Pipeline, queue Queue, cmder chan<- Commander) (Driver, error)
}
55 changes: 55 additions & 0 deletions plugins/v2/jobs/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package jobs

// Message represents the protobuf message received from the RPC call
type Message interface {
Base
KafkaOptions

// Name returns the name of the Job
Name() string
// Payload returns the data associated with the job
Payload() string
// Delay returns the delay time for the Job (not supported by all drivers)
Delay() int64
// AutoAck returns the autocommit status for the Job
AutoAck() bool
// UpdatePriority sets the priority of the Job. Priority is optional but cannot be set to 0.
// The default priority is 10
UpdatePriority(int64)
}

// KAFKA options (leave them empty for other drivers)
type KafkaOptions interface {
// Offset returns the offset associated with the Job
Offset() int64
// Partition returns the partition associated with the Job
Partition() int32
// Topic returns the topic associated with the Job
Topic() string
// Metadata returns the metadata associated with the Job
Metadata() string
}

type Pipeline interface {
// With sets a pipeline value
With(name string, value interface{})
// Name returns the pipeline name.
Name() string
// Driver returns the driver associated with the pipeline.
Driver() string
// Has checks if a value is present in the pipeline.
Has(name string) bool
// String returns the value of an option as a string or the default value.
String(name string, d string) string
// Int returns the value of an option as an int or the default value.
Int(name string, d int) int
// Bool returns the value of an option as a bool or the default value.
Bool(name string, d bool) bool
// Map returns the nested map value or an empty config.
// This might be used for SQS attributes or tags, for example
Map(name string, out map[string]string) error
// Priority returns the default pipeline priority
Priority() int64
// Get is used to retrieve the data associated with a key
Get(key string) interface{}
}
40 changes: 40 additions & 0 deletions plugins/v2/jobs/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package jobs

// Queue is a binary heap interface
type Queue interface {
// Remove removes element with provided ID (if exists) and returns that elements
Remove(id string) []Job
// Insert adds an item to the queue
Insert(item Job)
// ExtractMin returns the item with the highest priority (less value is the highest priority)
ExtractMin() Job
// Len returns the number of items in the queue
Len() uint64
}

// Job represents a binary heap item
type Job interface {
Base
// Ack acknowledges the item after processing
Ack() error
// Nack discards the item
Nack() error
// Requeue puts the message back to the queue with an optional delay
Requeue(headers map[string][]string, delay int64) error
// Body returns the payload associated with the item
Body() []byte
// Context returns any meta-information associated with the item
Context() ([]byte, error)
}

// Base interface represents the base meta-information which any message must have
type Base interface {
// ID returns a unique identifier for the item
ID() string
// PipelineID returns the pipeline ID associated with the item
PipelineID() string
// Priority returns the priority level used to sort the item
Priority() int64
// Headers returns the metadata for the item
Headers() map[string][]string
}
40 changes: 40 additions & 0 deletions plugins/v2/jobs/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package jobs

// constant keys to pack/unpack messages from different drivers
const (
RRID string = "rr_id"
RRJob string = "rr_job"
RRHeaders string = "rr_headers"
RRPipeline string = "rr_pipeline"
RRDelay string = "rr_delay"
RRPriority string = "rr_priority"
RRAutoAck string = "rr_auto_ack"
)

type Command string

const (
Stop Command = "stop"
)

// State represents job's state
type State struct {
// Pipeline name
Pipeline string
// Driver name
Driver string
// Queue name (tube for the beanstalk)
Queue string
// Active jobs which are consumed from the driver but not handled by the PHP worker yet
Active int64
// Delayed jobs
Delayed int64
// Reserved jobs which are in the driver but not consumed yet
Reserved int64
// Status - 1 Ready, 0 - Paused
Ready bool
// New in 2.10.5, pipeline priority
Priority uint64
// ErrorMessage New in 2023.1
ErrorMessage string
}

0 comments on commit cf3beeb

Please sign in to comment.