-
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
175 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
package jobs | ||
|
||
import "context" | ||
|
||
// Driver represents the interface for a single jobs driver | ||
type Driver interface { | ||
// Push pushes the job to the underlying driver | ||
Push(ctx context.Context, msg Message) error | ||
// Run starts consuming the pipeline | ||
Run(ctx context.Context, pipeline Pipeline) error | ||
// Stop stops the consumer and closes the underlying connection | ||
Stop(ctx context.Context) error | ||
// Pause pauses the jobs consuming (while still allowing job pushing) | ||
Pause(ctx context.Context, pipeline string) error | ||
// Resume resumes the consumer | ||
Resume(ctx context.Context, pipeline string) error | ||
// State returns information about the driver state | ||
State(ctx context.Context) (*State, error) | ||
} | ||
|
||
// Commander provides the ability to send a command to the Jobs plugin | ||
type Commander interface { | ||
// Command returns the command name | ||
Command() Command | ||
// Pipeline returns the associated command pipeline | ||
Pipeline() string | ||
} | ||
|
||
// Constructor constructs Consumer interface. Endure abstraction. | ||
type Constructor interface { | ||
// Name returns the name of the driver | ||
Name() string | ||
// DriverFromConfig constructs a driver (e.g. kafka, amqp) from the configuration using the provided configKey | ||
DriverFromConfig(configKey string, queue Queue, pipeline Pipeline, cmder chan<- Commander) (Driver, error) | ||
// DriverFromPipeline constructs a driver (e.g. kafka, amqp) from the pipeline. All configuration is provided by the pipeline | ||
DriverFromPipeline(pipe Pipeline, queue Queue, cmder chan<- Commander) (Driver, error) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package jobs | ||
|
||
// Message represents the protobuf message received from the RPC call | ||
type Message interface { | ||
Base | ||
KafkaOptions | ||
|
||
// Name returns the name of the Job | ||
Name() string | ||
// Payload returns the data associated with the job | ||
Payload() string | ||
// Delay returns the delay time for the Job (not supported by all drivers) | ||
Delay() int64 | ||
// AutoAck returns the autocommit status for the Job | ||
AutoAck() bool | ||
// UpdatePriority sets the priority of the Job. Priority is optional but cannot be set to 0. | ||
// The default priority is 10 | ||
UpdatePriority(int64) | ||
} | ||
|
||
// KAFKA options (leave them empty for other drivers) | ||
type KafkaOptions interface { | ||
// Offset returns the offset associated with the Job | ||
Offset() int64 | ||
// Partition returns the partition associated with the Job | ||
Partition() int32 | ||
// Topic returns the topic associated with the Job | ||
Topic() string | ||
// Metadata returns the metadata associated with the Job | ||
Metadata() string | ||
} | ||
|
||
type Pipeline interface { | ||
// With sets a pipeline value | ||
With(name string, value interface{}) | ||
// Name returns the pipeline name. | ||
Name() string | ||
// Driver returns the driver associated with the pipeline. | ||
Driver() string | ||
// Has checks if a value is present in the pipeline. | ||
Has(name string) bool | ||
// String returns the value of an option as a string or the default value. | ||
String(name string, d string) string | ||
// Int returns the value of an option as an int or the default value. | ||
Int(name string, d int) int | ||
// Bool returns the value of an option as a bool or the default value. | ||
Bool(name string, d bool) bool | ||
// Map returns the nested map value or an empty config. | ||
// This might be used for SQS attributes or tags, for example | ||
Map(name string, out map[string]string) error | ||
// Priority returns the default pipeline priority | ||
Priority() int64 | ||
// Get is used to retrieve the data associated with a key | ||
Get(key string) interface{} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package jobs | ||
|
||
// Queue is a binary heap interface | ||
type Queue interface { | ||
// Remove removes element with provided ID (if exists) and returns that elements | ||
Remove(id string) []Job | ||
// Insert adds an item to the queue | ||
Insert(item Job) | ||
// ExtractMin returns the item with the highest priority (less value is the highest priority) | ||
ExtractMin() Job | ||
// Len returns the number of items in the queue | ||
Len() uint64 | ||
} | ||
|
||
// Job represents a binary heap item | ||
type Job interface { | ||
Base | ||
// Ack acknowledges the item after processing | ||
Ack() error | ||
// Nack discards the item | ||
Nack() error | ||
// Requeue puts the message back to the queue with an optional delay | ||
Requeue(headers map[string][]string, delay int64) error | ||
// Body returns the payload associated with the item | ||
Body() []byte | ||
// Context returns any meta-information associated with the item | ||
Context() ([]byte, error) | ||
} | ||
|
||
// Base interface represents the base meta-information which any message must have | ||
type Base interface { | ||
// ID returns a unique identifier for the item | ||
ID() string | ||
// PipelineID returns the pipeline ID associated with the item | ||
PipelineID() string | ||
// Priority returns the priority level used to sort the item | ||
Priority() int64 | ||
// Headers returns the metadata for the item | ||
Headers() map[string][]string | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package jobs | ||
|
||
// constant keys to pack/unpack messages from different drivers | ||
const ( | ||
RRID string = "rr_id" | ||
RRJob string = "rr_job" | ||
RRHeaders string = "rr_headers" | ||
RRPipeline string = "rr_pipeline" | ||
RRDelay string = "rr_delay" | ||
RRPriority string = "rr_priority" | ||
RRAutoAck string = "rr_auto_ack" | ||
) | ||
|
||
type Command string | ||
|
||
const ( | ||
Stop Command = "stop" | ||
) | ||
|
||
// State represents job's state | ||
type State struct { | ||
// Pipeline name | ||
Pipeline string | ||
// Driver name | ||
Driver string | ||
// Queue name (tube for the beanstalk) | ||
Queue string | ||
// Active jobs which are consumed from the driver but not handled by the PHP worker yet | ||
Active int64 | ||
// Delayed jobs | ||
Delayed int64 | ||
// Reserved jobs which are in the driver but not consumed yet | ||
Reserved int64 | ||
// Status - 1 Ready, 0 - Paused | ||
Ready bool | ||
// New in 2.10.5, pipeline priority | ||
Priority uint64 | ||
// ErrorMessage New in 2023.1 | ||
ErrorMessage string | ||
} |