Skip to content

Commit

Permalink
feat: add RabbitMQ publisher (#137)
Browse files Browse the repository at this point in the history
* adding rabbit publish handler

* feat: handling model name

* feat: adding converter for payloads

* feat:rabbit-publisher add unit tests and handlers

* feat:rabbit-publisher fixing imports

* feat:rabbit-publisher add more unit tests

* feat:rabbit-publisher cleaning up code

* feat:rabbit-publisher cleaning up payload

* feat:rabbit-publisher adding publisher code

* feat:rabbit-publisher fixing lint checks

* feat:rabbit-publisher fixing tests

* feat: rabbi-publisher cleanup code

* feat: rabbit-publisher adding husky

* feat: addressing issues in PR

* feat: addressing code review comments

* feat: addressing code review comments

* feat: formatting changes

* feat: contribution guide

* feat: corrections to readme

* feat: formatting read me

* feat: add guard for missing operationId
  • Loading branch information
anandsunderraman authored Apr 12, 2022
1 parent a22e085 commit 7d9587d
Show file tree
Hide file tree
Showing 21 changed files with 1,074 additions and 15,576 deletions.
5 changes: 5 additions & 0 deletions .husky/pre-commit
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

npm run lint
npm test
59 changes: 53 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,66 @@
[![AsyncAPI logo](./assets/github-repobanner-generic.png)](https://www.asyncapi.com)
<!-- toc is generated with GitHub Actions do not remove toc markers -->

<!-- toc -->

- [Overview](#overview)
- [Template Output](#template-output)
- [Technical requirements](#technical-requirements)
- [Supported protocols](#supported-protocols)
- [How to use the template](#how-to-use-the-template)
* [CLI](#cli)
+ [Run the following command to generate a Go module](#run-the-following-command-to-generate-a-go-module)
+ [How to use the generated code](#how-to-use-the-generated-code)
- [Pre-requisites](#pre-requisites)
- [Running the code](#running-the-code)
- [Template configuration](#template-configuration)
- [Contribution guide](#contribution-guide)

<!-- tocstop -->


## Overview

This template generates a Go module that uses [watermill](https://github.com/ThreeDotsLabs/watermill) as the messaging middleware
This template generates a Go module that uses [watermill](https://github.com/ThreeDotsLabs/watermill) as library for building event-driven applications. The reason to choose [watermill](https://github.com/ThreeDotsLabs/watermill) is because it provides an abstraction to messaging supporting multiple protocols. This will help this generator support multiple protocols.

## Template Output

The `go` code generated by this template has the following structure

- __asyncapi__
- [handlers.go](asyncapi/handlers.go) -- handlers for publishers and subscribers
- [payloads.go](asyncapi/payloads.go) -- generated `go` structs using Modelina
- [publishers.go](asyncapi/publishers.go)
- [router.go](asyncapi/router.go) -- configures [watermill](https://watermill.io/) router, needed only for subscribers
- [server.go](asyncapi/server.go) -- server config
- [subscribers.go](asyncapi/subscribers.go)
- [go.mod](go.mod)
- [go.sum](go.sum)
- [main.go](main.go)

## Technical requirements

- 1.1.0 =< [Generator](https://github.com/asyncapi/generator/) < 2.0.0,
- Generator specific [requirements](https://github.com/asyncapi/generator/#requirements)

## Async API Requirements

To have correctly generated code, your AsyncAPI file MUST define operationId for every operation.

Example:
```
channels:
light/measured:
subscribe:
operationId: LumenPublish
```

## Supported protocols

Currently this template supports AMQP subscribers
| Protocol | Subscriber | Publishers |
|---|---|---|
| AMQP | Yes | Yes |


## How to use the template

Expand Down Expand Up @@ -62,14 +105,14 @@ go mod tidy
```bash
go run main.go
```
5. Running local instance of `rabbitmq`, navigate to it using `http://localhost:15672/` with username and password `guest`/ `guest` (These are default rabbitmq credentials).
5. Running local instance of `rabbitmq`, navigate to it using `http://localhost:15672/` with username and password `guest`/ `guest` (These are default rabbitmq credentials).
FYI one can start an instance of `rabbitmq` using `docker` as follow
```
docker run -d -p 15672:15672 -p 5672:5672 rabbitmq:3-management
```
6. Create a queue as per the AsyncAPI spec.
6. Create a queue as per the AsyncAPI spec.
This can be done either of the following ways
- Using the UI: Refer to this [article](https://www.cloudamqp.com/blog/part3-rabbitmq-for-beginners_the-management-interface.html) that walks through the process of how this can be done in the UI / RabbitMQ Admin
- Using the UI: Refer to this [article](https://www.cloudamqp.com/blog/part3-rabbitmq-for-beginners_the-management-interface.html) that walks through the process of how this can be done in the UI / RabbitMQ Admin
- `cURL` request. Default rabbitmq user is `guest` and password is `guest`
```
curl --user <rabbit-user>:<rabbit-password> -X PUT \
Expand All @@ -82,7 +125,7 @@ go run main.go
}'
```
7. Publish a message to the queue as per the AsyncAPI spec. This can be done either of the following ways
- Using the UI: Refer to this [article](https://www.cloudamqp.com/blog/part3-rabbitmq-for-beginners_the-management-interface.html) that walks through the process of how this can be done in the UI / RabbitMQ Admin
- Using the UI: Refer to this [article](https://www.cloudamqp.com/blog/part3-rabbitmq-for-beginners_the-management-interface.html) that walks through the process of how this can be done in the UI / RabbitMQ Admin
- `cURL` request. Default rabbitmq user is `guest` and password is `guest`
```
curl --user <rabbit-user>:<rabbit-password> -X POST \
Expand All @@ -105,3 +148,7 @@ You can configure this template by passing different parameters in the Generator
|Name|Description|Required|Example|
|---|---|---|---|
|moduleName|Name for the generated Go module|false|`my-app`|

## Contribution guide

If you are interested in contributing to this repo refer to the [contributing](https://github.com/asyncapi/go-watermill-template/docs/contributing.md) docs
80 changes: 70 additions & 10 deletions components/Handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { render } from '@asyncapi/generator-react-sdk';
import { pascalCase } from './common';

const subscriptionFunction = (channelName, operation, message) => `
// ${operation} subscription handler for ${channelName}.
// ${operation} subscription handler for ${channelName}.
func ${operation}(msg *message.Message) error {
log.Printf("received message payload: %s", string(msg.Payload))
Expand All @@ -15,28 +15,88 @@ func ${operation}(msg *message.Message) error {
}
`;

function SubscriptionHandlers({ channels }) {
export function SubscriptionHandlers({ channels }) {
return Object.entries(channels)
.map(([channelName, channel]) => {
if (channel.hasPublish()) {
const operation = pascalCase(channel.publish().id());
const message = pascalCase(channel.publish().message(0).payload().$id());
return subscriptionFunction(channelName, operation, message);
if (!operation) {
throw new Error('This template requires operationId to be set for every operation.');
}

const msgName = channel.publish().message(0).uid();
const message = pascalCase(msgName);
return subscriptionFunction(channelName, operation, message);
}
return '';
}).join('');
}

export function publishConfigsFrom(channelName, channel) {
const msgName = channel.subscribe().message(0).uid();
const message = pascalCase(msgName);
const operation = pascalCase(channel.subscribe().id());
if (!operation) {
throw new Error('This template requires operationId to be set for every operation.');
}
return {
operation,
message,
channelName
};
}

const amqpPublisherFunction = (channelName, operation, message) => `
// ${operation} is the publish handler for ${channelName}.
func ${operation}(ctx context.Context, a *amqp.Publisher, payload ${message}) error {
m, err := PayloadToMessage(payload)
if err != nil {
log.Fatalf("error converting payload: %+v to message error: %s", payload, err)
}
return a.Publish("${channelName}", m)
}
`;

export function PublishHandlers({ channels }) {
return Object.entries(channels)
.map(([channelName, channel]) => {
if (channel.hasSubscribe() && channel.bindings().amqp) {
//generate amqp publisher
const pubConfig = publishConfigsFrom(channelName, channel);
return amqpPublisherFunction(pubConfig.channelName, pubConfig.operation, pubConfig.message);
}
return '';
});
}).join('');
}

export function Imports(channels) {
const dependencies = new Set();
for (const [, channel] of Object.entries(channels)) {
if (channel.hasPublish()) {
dependencies.add(`
"encoding/json"
"github.com/ThreeDotsLabs/watermill/message"`);
}

if (channel.hasSubscribe() && channel.bindings().amqp) {
dependencies.add(`
"context"
"github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp"`);
}
}
return [...dependencies].join('\n');
}

export function Handlers({ moduleName, channels}) {
export function Handlers({channels}) {
return `
package asyncapi
import (
"encoding/json"
"log"
"github.com/ThreeDotsLabs/watermill/message"
${Imports(channels)}
)
${render(<SubscriptionHandlers channels={channels} />)}
${render(<PublishHandlers channels={channels} />)}
`;
}
}
46 changes: 46 additions & 0 deletions components/Publisher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//render an AMQP subscriber
function AMQPPublisher() {
return `
// GetAMQPPublisher returns an amqp publisher based on the URI
func GetAMQPPublisher(amqpURI string) (*amqp.Publisher, error) {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
return amqp.NewPublisher(
amqpConfig,
watermill.NewStdLogger(false, false),
)
}
`;
}

export function Publisher({publisherFlags}) {
const amqpMod = 'github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp';

const modules = [];
const publishers = [];
let importMod = '';
let publisherBlock = '';
if (publisherFlags.hasAMQPPub) {
modules.push(amqpMod);
publishers.push(AMQPPublisher());
}

if (modules.length > 0) {
importMod = modules.map(m => `"${m}"`).join('\n');
}

if (publishers.length > 0) {
publisherBlock = publishers.join('\n');
}

return `
package asyncapi
import (
"github.com/ThreeDotsLabs/watermill"
${importMod}
)
${publisherBlock}
`;
}
60 changes: 57 additions & 3 deletions components/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function GetProtocolFlags(asyncapi) {
}).length > 0;

protocolFlags.hasAMQP = hasAMQP;

return protocolFlags;
}

Expand Down Expand Up @@ -56,11 +56,65 @@ export function GetSubscriberFlags(asyncapi) {
}).length > 0;

subscriberFlags.hasAMQPSub = hasAMQPSub;

return subscriberFlags;
}

/**
* Input: parsed asyncapi object
* Output: object which indicates what protocols have publishers
* Curently supports AMQP alone
* Example Output:
* {
* "hasAMQPPub": true
* }
*/
export function GetPublisherFlags(asyncapi) {
const publisherFlags = {
hasAMQPPub: false
};

const channelEntries = Object.keys(asyncapi.channels()).length ? Object.entries(asyncapi.channels()) : [];
//if there are no channels do nothing
if (channelEntries.length === 0) {
return publisherFlags;
}

//if there are no amqp publisher or subscribers do nothing
const hasAMQPPub = channelEntries.filter(([channelName, channel]) => {
return channel.hasSubscribe() && channel.bindings().amqp;
}).length > 0;

publisherFlags.hasAMQPPub = hasAMQPPub;

return publisherFlags;
}

export function hasPubOrSub(asyncapi) {
return hasPub(asyncapi) || hasSub(asyncapi);
}

export function hasSub(asyncapi) {
const subscriberFlags = GetSubscriberFlags(asyncapi);
for (const protocol in subscriberFlags) {
if (subscriberFlags[`${protocol}`] === true) {
return true;
}
}
return false;
}

export function hasPub(asyncapi) {
const publisherFlags = GetPublisherFlags(asyncapi);
for (const protocol in publisherFlags) {
if (publisherFlags[`${protocol}`] === true) {
return true;
}
}
return false;
}

export function pascalCase(string) {
string = _.camelCase(string);
return string.charAt(0).toUpperCase() + string.slice(1);
}
}
Loading

0 comments on commit 7d9587d

Please sign in to comment.