-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
15 changed files
with
926 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
*.iml | ||
.idea | ||
*.coverprofile |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
VERSION=v$(shell cat VERSION) | ||
|
||
test: | ||
ginkgo -r -cover -race -progress -keepGoing -randomizeAllSpecs -slowSpecThreshold 5 -trace | ||
|
||
release: | ||
git tag $(VERSION) --message "release $(VERSION) ($(shell date '+%Y-%m-%d'))" | ||
git push $(VERSION) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# AMQP | ||
|
||
High-level wrapper [amqp](github.com/streadway/amqp): | ||
|
||
* Consumer/Producer, with [events](github.com/adone/go.events) integration | ||
* Listener/Publisher with fault-tollerance mechanics | ||
* autocreating queues, exchanges & bindings from YAML/JSON configuration |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
1.0.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package amqp_test | ||
|
||
import ( | ||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
|
||
"testing" | ||
) | ||
|
||
func TestAmqp(t *testing.T) { | ||
RegisterFailHandler(Fail) | ||
RunSpecs(t, "Amqp Suite") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package amqp | ||
|
||
type Binding struct { | ||
Exchange string `yaml:"exchange"` | ||
Key string `yaml:"key"` | ||
} | ||
|
||
type Exchange struct { | ||
Name string `yaml:"name"` | ||
Type string `yaml:"type"` | ||
Durable bool `yaml:"durable"` | ||
Bindings []Binding `yaml:"bindings"` | ||
} | ||
|
||
type Queue struct { | ||
Name string `yaml:"name"` | ||
Key string `yaml:"key"` | ||
Durable bool `yaml:"durable"` | ||
Deletable bool `yaml:"deletable"` | ||
Bindings []Binding `yaml:"bindings"` | ||
Arguments Arguments `yaml:"arguments"` | ||
} | ||
|
||
type Arguments struct { | ||
MessageTTL int64 `yaml:"x-message-ttl"` | ||
DeadLetterExchange string `yaml:"x-dead-letter-exchange"` | ||
} | ||
|
||
func (a Arguments) ToMap() map[string]interface{} { | ||
result := make(map[string]interface{}) | ||
if a.MessageTTL > 0 { | ||
result["x-message-ttl"] = a.MessageTTL | ||
} | ||
if a.DeadLetterExchange != "" { | ||
result["x-dead-letter-exchange"] = a.DeadLetterExchange | ||
} | ||
return result | ||
} | ||
|
||
type Configuration struct { | ||
URL string `yaml:"url"` | ||
Node string `yaml:"node"` | ||
Exchanges []Exchange `yaml:"exchanges"` | ||
Queues []Queue `yaml:"queues"` | ||
} | ||
|
||
func (config *Configuration) GetNodeName() string { | ||
if config.Node == "" { | ||
return "node" | ||
} | ||
return config.Node | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package amqp | ||
|
||
import ( | ||
"github.com/pkg/errors" | ||
"github.com/streadway/amqp" | ||
"gopkg.in/adone/go.events.v2" | ||
) | ||
|
||
// Consumer consume messages from message broker | ||
type Consumer struct { | ||
*events.Emitter | ||
|
||
MaxMessages int | ||
Queue string | ||
|
||
tag string | ||
channel *amqp.Channel | ||
error chan error | ||
done chan struct{} | ||
} | ||
|
||
// Start consuming messages | ||
func (consumer *Consumer) Start() error { | ||
consumer.done = make(chan struct{}) | ||
consumer.error = make(chan error) | ||
|
||
defer close(consumer.error) | ||
|
||
if err := consumer.channel.Qos(consumer.MaxMessages, 0, false); err != nil { | ||
return err | ||
} | ||
|
||
deliveries, err := consumer.channel.Consume( | ||
consumer.Queue, // name | ||
consumer.tag, // consumerTag | ||
false, // noAck | ||
false, // exclusive | ||
false, // noLocal | ||
false, // noWait | ||
nil, // arguments | ||
) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
go consumer.listenChannel() | ||
go consumer.listenDeliveries(deliveries) | ||
|
||
return <-consumer.error | ||
} | ||
|
||
// Stop consumimg messages | ||
func (consumer *Consumer) Stop() error { | ||
select { | ||
case <-consumer.done: | ||
return nil | ||
default: | ||
// manual call to channel.Cancel does not fire NotifyCancel, it just close deliveries chan | ||
if err := consumer.channel.Cancel(consumer.tag, false); err != nil { | ||
return errors.Wrap(err, "channel cancel failed") | ||
} | ||
|
||
return nil | ||
} | ||
} | ||
|
||
func (consumer *Consumer) Close() error { | ||
return consumer.channel.Close() | ||
} | ||
|
||
func (consumer *Consumer) listenDeliveries(deliveries <-chan amqp.Delivery) { | ||
defer close(consumer.done) | ||
|
||
for delivery := range deliveries { | ||
consumer.Fire(events.New(ConsumerData, events.WithContext(events.Map{ | ||
"key": delivery.RoutingKey, | ||
"data": Message{delivery}, | ||
"queue": consumer.Queue, | ||
}))) | ||
} | ||
} | ||
|
||
func (consumer *Consumer) listenChannel() { | ||
select { | ||
case reason := <-consumer.channel.NotifyCancel(make(chan string)): | ||
consumer.Fire(events.New(ConsumerCanceled, events.WithContext(events.Map{"consumer": consumer}))) | ||
consumer.error <- errors.Errorf("channel canceled: %s", reason) | ||
case err := <-consumer.channel.NotifyClose(make(chan *amqp.Error)): | ||
consumer.Fire(events.New(ConsumerClosed, events.WithContext(events.Map{"consumer": consumer, "error": err}))) | ||
consumer.error <- err | ||
case <-consumer.done: | ||
consumer.error <- nil | ||
} | ||
} |
Oops, something went wrong.