Skip to content

Commit

Permalink
feat: golang client POC
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Feb 1, 2024
1 parent 3131dce commit 7641749
Show file tree
Hide file tree
Showing 8 changed files with 399 additions and 0 deletions.
7 changes: 7 additions & 0 deletions clients/golang/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# BullMQ Proxy Client for Golang

This is a experimental BullMQ proxy client for Golang.

## Usage

TODO:
19 changes: 19 additions & 0 deletions clients/golang/examples/example_add_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"fmt"

"taskforce.sh/bullmq_proxy_client/queue"
)

func main() {
q := queue.NewQueue("ws://localhost:8080/queues/test?token=1234")

// Sample usage of your library's functionality.
jobResponse, err := q.AddJob("testJob", map[string]string{"key": "value"}, nil)
if err != nil {
fmt.Println("Error:", err)
return
}
fmt.Println("Job Added:", jobResponse)
}
5 changes: 5 additions & 0 deletions clients/golang/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module taskforce.sh/bullmq_proxy_client

go 1.21.1

require github.com/gorilla/websocket v1.5.0 // indirect
2 changes: 2 additions & 0 deletions clients/golang/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
76 changes: 76 additions & 0 deletions clients/golang/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package queue

import (
"encoding/json"
"fmt"

"taskforce.sh/bullmq_proxy_client/wsclient"
)

type QueueCommand struct {
Fn string `json:"fn"`
Args []interface{} `json:"args"`
}

type Queue struct {
ws *wsclient.WebSocket[QueueCommand]
}

func NewQueue(url string) *Queue {
var ws = wsclient.New[QueueCommand](url)
return &Queue{ws: ws}
}

type JobResponse struct {
AttemptsMade int `json:"attemptsMade,omitempty"`
Data map[string]interface{} `json:"data,omitempty"`
Delay int `json:"delay,omitempty"`
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Opts JobOpts `json:"opts,omitempty"`
Progress int `json:"progress,omitempty"`
QueueQualifiedName string `json:"queueQualifiedName,omitempty"`
ReturnValue interface{} `json:"returnvalue,omitempty"`
Stacktrace interface{} `json:"stacktrace,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
}

type JobOpts struct {
Attempts *int `json:"attempts,omitempty"`
Delay *int `json:"delay,omitempty"`
}

func (q *Queue) AddJob(name string, data interface{}, opts interface{}) (*JobResponse, error) {
cmd := QueueCommand{
Fn: "add",
Args: []interface{}{name, data, opts},
}

rawData, err := q.ws.SendWebSocketMessage(cmd)
if err != nil {
return nil, fmt.Errorf("Failed to send message: %v", err)
}

if(rawData == nil) {
return nil, fmt.Errorf("Failed to receive response")
}

var jobResp JobResponse
if err := json.Unmarshal(*rawData, &jobResp); err != nil {
return nil, fmt.Errorf("Failed to unmarshal response: %v", err)
}

return &jobResp, nil
}

func (q *Queue) PauseJob() {
cmd := QueueCommand{
Fn: "pause",
Args: []interface{}{},
}
q.ws.SendWebSocketMessage(cmd)
}

func (q *Queue) Close() {
q.ws.Close()
}
69 changes: 69 additions & 0 deletions clients/golang/queue/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package queue

import (
"fmt"

// add other required imports
"taskforce.sh/bullmq_proxy_client/wsclient"
)

type WorkerCommand struct {
Type string `json:"type"`
Payload interface{} `json:"payload"`
}

type JobResult struct {
Result interface{} `json:"result"`
}

type JobError struct {
Message string `json:"message"`
Stack string `json:"stack"`
}

type ProcessorFunc func(job interface{}) (interface{}, error)

type QueueWorker struct {
ws *wsclient.WebSocket[WorkerCommand]
}

func NewWorker(host string, queueName string, token string, concurrency int, processor ProcessorFunc) *QueueWorker {
url := fmt.Sprintf("%s/queues/%s/process/%d?token=%s", host, queueName, concurrency, token)
var ws = wsclient.New[WorkerCommand](url)

var qw = &QueueWorker{ws: ws}
go qw.listen(processor)

return qw;
}

func (qw *QueueWorker) listen(processor ProcessorFunc) {
for {
message, err := qw.ws.ReceiveWebSocketMessage() // assuming this blocks until a message is received
if err != nil {
fmt.Printf("Error receiving message: %v\n", err)
continue
}

fmt.Printf("Receiving message: %v\n", message)
if message.Data.Type == "process" {
go qw.callProcessor(message.ID, message.Data.Payload, processor)
} else {
fmt.Printf("Unknown Worker message type %s\n", message.Data.Type)
}
}
}

func (qw *QueueWorker) callProcessor(id int, data interface{}, processor ProcessorFunc) {
result, err := processor(data)
if err != nil {
qw.ws.Respond(id, &JobError{Message: err.Error(), Stack: ""}) // modify Respond to handle the error response
return
}

qw.ws.Respond(id, &JobResult{Result: result})
}

func (qw *QueueWorker) Close() {
qw.ws.Close()
}
65 changes: 65 additions & 0 deletions clients/golang/tests/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package wsclient

import (
"sync"
"testing"
"time"

"taskforce.sh/bullmq_proxy_client/queue"
)

func TestAddJob(t *testing.T) {
const numJobs = 10
q := queue.NewQueue("ws://localhost:8080/queues/test?token=1234")

// Use a wait group to wait for all goroutines to finish
var wg sync.WaitGroup

// A buffered channel to communicate errors. It can store as many errors as there are jobs.
errCh := make(chan error, 100)

// Add 1000 jobs and benchmark the time it takes to add them.
start := time.Now()

for i := 0; i < numJobs; i++ {
wg.Add(1) // Add a count to the wait group for each goroutine

go func() {
defer wg.Done() // Decrease the count when the goroutine finishes

_, err := q.AddJob("test", map[string]interface{}{"foo": "bar"}, nil)
if err != nil {
errCh <- err // Send any errors to the channel
}
}()
}

// Wait for all goroutines to finish
wg.Wait()
close(errCh) // Close the error channel

elapsed := time.Since(start)
t.Logf("Added %d jobs in %s\n", numJobs, elapsed)

// Check if there were any errors
for err := range errCh {
t.Error("Error adding job:", err)
}

// Channels for signaling
var jobCount int

worker := queue.NewWorker("ws://localhost:8080", "test", "1234", 10, func(job interface{}) (interface{}, error) {
t.Logf("Processing job: %v", job)
jobCount++
return nil, nil
})

// Sleep for a bit to allow the worker to process
time.Sleep(8 * time.Second)

t.Logf("Processed %d of %d jobs", jobCount, numJobs)

// Close the worker now
worker.Close()
}
Loading

0 comments on commit 7641749

Please sign in to comment.