diff --git a/go.mod b/go.mod index 10f0426..ccb705c 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,6 @@ require go.uber.org/zap v1.24.0 require ( github.com/stretchr/testify v1.8.1 // indirect - go.uber.org/atomic v1.10.0 // indirect + go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect ) diff --git a/go.sum b/go.sum index 15fd6d6..2b6aebd 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= -go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= diff --git a/plugins/v2/jobs/driver.go b/plugins/v2/jobs/driver.go new file mode 100644 index 0000000..3e7ff9c --- /dev/null +++ b/plugins/v2/jobs/driver.go @@ -0,0 +1,37 @@ +package jobs + +import "context" + +// Driver represents the interface for a single jobs driver +type Driver interface { + // Push pushes the job to the underlying driver + Push(ctx context.Context, msg Message) error + // Run starts consuming the pipeline + Run(ctx context.Context, pipeline Pipeline) error + // Stop stops the consumer and closes the underlying connection + Stop(ctx context.Context) error + // Pause pauses the jobs consuming (while still allowing job pushing) + Pause(ctx context.Context, pipeline string) error + // Resume resumes the consumer + Resume(ctx context.Context, pipeline string) error + // State returns information about the driver state + State(ctx context.Context) (*State, error) +} + +// Commander provides the ability to send a command to the Jobs plugin +type Commander interface { + // Command returns the command name + Command() Command + // Pipeline returns the associated command pipeline + Pipeline() string +} + +// Constructor constructs Consumer interface. Endure abstraction. +type Constructor interface { + // Name returns the name of the driver + Name() string + // DriverFromConfig constructs a driver (e.g. kafka, amqp) from the configuration using the provided configKey + DriverFromConfig(configKey string, queue Queue, pipeline Pipeline, cmder chan<- Commander) (Driver, error) + // DriverFromPipeline constructs a driver (e.g. kafka, amqp) from the pipeline. All configuration is provided by the pipeline + DriverFromPipeline(pipe Pipeline, queue Queue, cmder chan<- Commander) (Driver, error) +} diff --git a/plugins/v2/jobs/job.go b/plugins/v2/jobs/job.go new file mode 100644 index 0000000..3a7dfa7 --- /dev/null +++ b/plugins/v2/jobs/job.go @@ -0,0 +1,55 @@ +package jobs + +// Message represents the protobuf message received from the RPC call +type Message interface { + Base + KafkaOptions + + // Name returns the name of the Job + Name() string + // Payload returns the data associated with the job + Payload() string + // Delay returns the delay time for the Job (not supported by all drivers) + Delay() int64 + // AutoAck returns the autocommit status for the Job + AutoAck() bool + // UpdatePriority sets the priority of the Job. Priority is optional but cannot be set to 0. + // The default priority is 10 + UpdatePriority(int64) +} + +// KAFKA options (leave them empty for other drivers) +type KafkaOptions interface { + // Offset returns the offset associated with the Job + Offset() int64 + // Partition returns the partition associated with the Job + Partition() int32 + // Topic returns the topic associated with the Job + Topic() string + // Metadata returns the metadata associated with the Job + Metadata() string +} + +type Pipeline interface { + // With sets a pipeline value + With(name string, value interface{}) + // Name returns the pipeline name. + Name() string + // Driver returns the driver associated with the pipeline. + Driver() string + // Has checks if a value is present in the pipeline. + Has(name string) bool + // String returns the value of an option as a string or the default value. + String(name string, d string) string + // Int returns the value of an option as an int or the default value. + Int(name string, d int) int + // Bool returns the value of an option as a bool or the default value. + Bool(name string, d bool) bool + // Map returns the nested map value or an empty config. + // This might be used for SQS attributes or tags, for example + Map(name string, out map[string]string) error + // Priority returns the default pipeline priority + Priority() int64 + // Get is used to retrieve the data associated with a key + Get(key string) interface{} +} diff --git a/plugins/v2/jobs/priority_queue.go b/plugins/v2/jobs/priority_queue.go new file mode 100644 index 0000000..922a0df --- /dev/null +++ b/plugins/v2/jobs/priority_queue.go @@ -0,0 +1,40 @@ +package jobs + +// Queue is a binary heap interface +type Queue interface { + // Remove removes element with provided ID (if exists) and returns that elements + Remove(id string) []Job + // Insert adds an item to the queue + Insert(item Job) + // ExtractMin returns the item with the highest priority (less value is the highest priority) + ExtractMin() Job + // Len returns the number of items in the queue + Len() uint64 +} + +// Job represents a binary heap item +type Job interface { + Base + // Ack acknowledges the item after processing + Ack() error + // Nack discards the item + Nack() error + // Requeue puts the message back to the queue with an optional delay + Requeue(headers map[string][]string, delay int64) error + // Body returns the payload associated with the item + Body() []byte + // Context returns any meta-information associated with the item + Context() ([]byte, error) +} + +// Base interface represents the base meta-information which any message must have +type Base interface { + // ID returns a unique identifier for the item + ID() string + // PipelineID returns the pipeline ID associated with the item + PipelineID() string + // Priority returns the priority level used to sort the item + Priority() int64 + // Headers returns the metadata for the item + Headers() map[string][]string +} diff --git a/plugins/v2/jobs/state.go b/plugins/v2/jobs/state.go new file mode 100644 index 0000000..1da72ef --- /dev/null +++ b/plugins/v2/jobs/state.go @@ -0,0 +1,40 @@ +package jobs + +// constant keys to pack/unpack messages from different drivers +const ( + RRID string = "rr_id" + RRJob string = "rr_job" + RRHeaders string = "rr_headers" + RRPipeline string = "rr_pipeline" + RRDelay string = "rr_delay" + RRPriority string = "rr_priority" + RRAutoAck string = "rr_auto_ack" +) + +type Command string + +const ( + Stop Command = "stop" +) + +// State represents job's state +type State struct { + // Pipeline name + Pipeline string + // Driver name + Driver string + // Queue name (tube for the beanstalk) + Queue string + // Active jobs which are consumed from the driver but not handled by the PHP worker yet + Active int64 + // Delayed jobs + Delayed int64 + // Reserved jobs which are in the driver but not consumed yet + Reserved int64 + // Status - 1 Ready, 0 - Paused + Ready bool + // New in 2.10.5, pipeline priority + Priority uint64 + // ErrorMessage New in 2023.1 + ErrorMessage string +}